Skip to content

Commit 314e131

Browse files
committed
FEAT: Add Operation throughput and latency metrics by mbean.
1 parent 132254c commit 314e131

9 files changed

Lines changed: 331 additions & 0 deletions

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import net.spy.memcached.compat.SpyObject;
5151
import net.spy.memcached.compat.log.LoggerFactory;
5252
import net.spy.memcached.internal.ReconnDelay;
53+
import net.spy.memcached.metrics.OpLatencyMonitor;
54+
import net.spy.memcached.metrics.OpThroughputMonitor;
5355
import net.spy.memcached.ops.KeyedOperation;
5456
import net.spy.memcached.ops.MultiOperationCallback;
5557
import net.spy.memcached.ops.Operation;
@@ -990,6 +992,7 @@ private void handleReads(MemcachedNode qa)
990992
throw new IllegalStateException("No read operation.");
991993
}
992994
currentOp.readFromBuffer(rbuf);
995+
OpLatencyMonitor.getInstance().recordLatency(currentOp.getStartTime());
993996
if (currentOp.getState() == OperationState.COMPLETE) {
994997
getLogger().debug("Completed read op: %s and giving the next %d bytes",
995998
currentOp, rbuf.remaining());
@@ -1519,6 +1522,7 @@ public String toString() {
15191522
* @param op
15201523
*/
15211524
public static void opTimedOut(Operation op) {
1525+
OpThroughputMonitor.getInstance().addTimeOutedOpCount(1);
15221526
MemcachedConnection.setTimeout(op, true);
15231527
}
15241528

@@ -1528,6 +1532,7 @@ public static void opTimedOut(Operation op) {
15281532
* @param ops
15291533
*/
15301534
public static void opsTimedOut(Collection<Operation> ops) {
1535+
OpThroughputMonitor.getInstance().addTimeOutedOpCount(ops.size());
15311536
Collection<String> timedOutNodes = new HashSet<>();
15321537
for (Operation op : ops) {
15331538
try {
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package net.spy.memcached.metrics;
2+
3+
class LatencyMetricsSnapShot {
4+
private static final LatencyMetricsSnapShot EMPTY = new LatencyMetricsSnapShot(0, 0, 0, 0, 0, 0);
5+
6+
private final long avgLatency;
7+
private final long minLatency;
8+
private final long maxLatency;
9+
private final long p25Latency;
10+
private final long p50Latency;
11+
private final long p75Latency;
12+
private final long timestamp; // 캐시 생성 시간
13+
14+
LatencyMetricsSnapShot(long avg, long min, long max, long p25, long p50, long p75) {
15+
this.avgLatency = avg;
16+
this.minLatency = min;
17+
this.maxLatency = max;
18+
this.p25Latency = p25;
19+
this.p50Latency = p50;
20+
this.p75Latency = p75;
21+
this.timestamp = System.currentTimeMillis();
22+
}
23+
24+
public static LatencyMetricsSnapShot empty() {
25+
return EMPTY;
26+
}
27+
28+
public long getAvgLatency() {
29+
return avgLatency;
30+
}
31+
32+
public long getMinLatency() {
33+
return minLatency;
34+
}
35+
36+
public long getMaxLatency() {
37+
return maxLatency;
38+
}
39+
40+
public long getP25Latency() {
41+
return p25Latency;
42+
}
43+
44+
public long getP50Latency() {
45+
return p50Latency;
46+
}
47+
48+
public long getP75Latency() {
49+
return p75Latency;
50+
}
51+
52+
public long getTimestamp() {
53+
return timestamp;
54+
}
55+
}
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.TimeUnit;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.atomic.AtomicReferenceArray;
9+
10+
import net.spy.memcached.ArcusMBeanServer;
11+
12+
public final class OpLatencyMonitor implements OpLatencyMonitorMBean {
13+
14+
private static final OpLatencyMonitor INSTANCE = new OpLatencyMonitor();
15+
private static final long CACHE_DURATION = 2000; // 2초 캐시
16+
private static final int WINDOW_SIZE = 10_000;
17+
18+
private final AtomicReferenceArray<Long> latencies = new AtomicReferenceArray<>(WINDOW_SIZE);
19+
private final AtomicInteger currentIndex = new AtomicInteger(0);
20+
private final AtomicInteger count = new AtomicInteger(0);
21+
private final AtomicReference<LatencyMetricsSnapShot> cachedMetrics
22+
= new AtomicReference<>(LatencyMetricsSnapShot.empty());
23+
private final boolean enabled;
24+
25+
private OpLatencyMonitor() {
26+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
27+
enabled = false;
28+
return;
29+
}
30+
enabled = true;
31+
for (int i = 0; i < WINDOW_SIZE; i++) {
32+
latencies.set(i, 0L);
33+
}
34+
35+
try {
36+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
37+
mbs.registMBean(this, this.getClass().getPackage().getName()
38+
+ ":type=" + this.getClass().getSimpleName());
39+
} catch (Exception e) {
40+
throw new RuntimeException("Failed to register MBean", e);
41+
}
42+
}
43+
44+
public static OpLatencyMonitor getInstance() {
45+
return INSTANCE;
46+
}
47+
48+
public void recordLatency(long startNanos) {
49+
if (!enabled) {
50+
return;
51+
}
52+
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - startNanos);
53+
int index = currentIndex.getAndUpdate(i -> (i + 1) % WINDOW_SIZE);
54+
latencies.lazySet(index, latencyMicros);
55+
56+
if (count.get() < WINDOW_SIZE) {
57+
count.incrementAndGet();
58+
}
59+
}
60+
61+
// 모든 메트릭을 한 번에 계산하고 캐시하는 메서드
62+
private LatencyMetricsSnapShot computeMetrics() {
63+
int currentCount = count.get();
64+
if (currentCount == 0) {
65+
return LatencyMetricsSnapShot.empty();
66+
}
67+
68+
// 현재 데이터를 배열로 복사
69+
List<Long> sortedLatencies = new ArrayList<>(currentCount);
70+
int startIndex = currentIndex.get();
71+
72+
for (int i = 0; i < currentCount; i++) {
73+
int idx = (startIndex - i + WINDOW_SIZE) % WINDOW_SIZE;
74+
long value = latencies.get(idx);
75+
if (value > 0) {
76+
sortedLatencies.add(value);
77+
}
78+
}
79+
80+
if (sortedLatencies.isEmpty()) {
81+
return LatencyMetricsSnapShot.empty();
82+
}
83+
84+
sortedLatencies.sort(Long::compareTo);
85+
86+
// 모든 메트릭을 한 번에 계산
87+
long avg = sortedLatencies.stream().mapToLong(Long::longValue).sum() / sortedLatencies.size();
88+
long min = sortedLatencies.get(0);
89+
long max = sortedLatencies.get(sortedLatencies.size() - 1);
90+
long p25 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 25.0) / 100.0) - 1);
91+
long p50 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 50.0) / 100.0) - 1);
92+
long p75 = sortedLatencies.get((int) Math.ceil((sortedLatencies.size() * 75.0) / 100.0) - 1);
93+
94+
return new LatencyMetricsSnapShot(avg, min, max, p25, p50, p75);
95+
}
96+
97+
// 캐시된 메트릭을 가져오거나 필요시 새로 계산
98+
private LatencyMetricsSnapShot getMetricsSnapshot() {
99+
LatencyMetricsSnapShot current = cachedMetrics.get();
100+
long now = System.currentTimeMillis();
101+
102+
// 캐시가 유효한지 확인
103+
if (now - current.getTimestamp() < CACHE_DURATION) {
104+
return current;
105+
}
106+
107+
// 새로운 메트릭 계산 및 캐시 업데이트
108+
LatencyMetricsSnapShot newMetrics = computeMetrics();
109+
cachedMetrics.set(newMetrics);
110+
return newMetrics;
111+
}
112+
113+
@Override
114+
public long getAverageLatencyMicros() {
115+
return getMetricsSnapshot().getAvgLatency();
116+
}
117+
118+
@Override
119+
public long getMinLatencyMicros() {
120+
return getMetricsSnapshot().getMinLatency();
121+
}
122+
123+
@Override
124+
public long getMaxLatencyMicros() {
125+
return getMetricsSnapshot().getMaxLatency();
126+
}
127+
128+
@Override
129+
public long get25thPercentileLatencyMicros() {
130+
return getMetricsSnapshot().getP25Latency();
131+
}
132+
133+
@Override
134+
public long get50thPercentileLatencyMicros() {
135+
return getMetricsSnapshot().getP50Latency();
136+
}
137+
138+
@Override
139+
public long get75thPercentileLatencyMicros() {
140+
return getMetricsSnapshot().getP75Latency();
141+
}
142+
143+
@Override
144+
public void resetStatistics() {
145+
count.set(0);
146+
currentIndex.set(0);
147+
cachedMetrics.set(LatencyMetricsSnapShot.empty());
148+
}
149+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpLatencyMonitorMBean {
4+
long getAverageLatencyMicros();
5+
6+
long getMaxLatencyMicros();
7+
8+
long getMinLatencyMicros();
9+
10+
long get25thPercentileLatencyMicros();
11+
12+
long get50thPercentileLatencyMicros();
13+
14+
long get75thPercentileLatencyMicros();
15+
16+
void resetStatistics();
17+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package net.spy.memcached.metrics;
2+
3+
import java.util.concurrent.atomic.LongAdder;
4+
5+
import net.spy.memcached.ArcusMBeanServer;
6+
7+
public final class OpThroughputMonitor implements OpThroughputMonitorMBean {
8+
private static final OpThroughputMonitor INSTANCE = new OpThroughputMonitor();
9+
10+
private final LongAdder completeOps = new LongAdder();
11+
private final LongAdder cancelOps = new LongAdder();
12+
private final LongAdder timeOutOps = new LongAdder();
13+
private final boolean enabled;
14+
15+
private OpThroughputMonitor() {
16+
if (System.getProperty("arcus.mbean", "false").toLowerCase().equals("false")) {
17+
enabled = false;
18+
return;
19+
}
20+
enabled = true;
21+
try {
22+
ArcusMBeanServer mbs = ArcusMBeanServer.getInstance();
23+
mbs.registMBean(this, this.getClass().getPackage().getName()
24+
+ ":type=" + this.getClass().getSimpleName());
25+
} catch (Exception e) {
26+
throw new RuntimeException("Failed to register Throughput MBean", e);
27+
}
28+
}
29+
30+
public static OpThroughputMonitor getInstance() {
31+
return INSTANCE;
32+
}
33+
34+
public void addCompletedOpCount() {
35+
if (!enabled) {
36+
return;
37+
}
38+
completeOps.increment();
39+
}
40+
41+
public void addCanceledOpCount() {
42+
if (!enabled) {
43+
return;
44+
}
45+
cancelOps.increment();
46+
}
47+
48+
public void addTimeOutedOpCount(int count) {
49+
if (!enabled) {
50+
return;
51+
}
52+
timeOutOps.add(count);
53+
}
54+
55+
@Override
56+
public long getCompletedOps() {
57+
return completeOps.sum();
58+
}
59+
60+
@Override
61+
public long getCanceledOps() {
62+
return cancelOps.sum();
63+
}
64+
65+
@Override
66+
public long getTimeoutOps() {
67+
return timeOutOps.sum();
68+
}
69+
70+
@Override
71+
public void resetStatistics() {
72+
completeOps.reset();
73+
cancelOps.reset();
74+
timeOutOps.reset();
75+
}
76+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package net.spy.memcached.metrics;
2+
3+
public interface OpThroughputMonitorMBean {
4+
long getCompletedOps();
5+
6+
long getCanceledOps();
7+
8+
long getTimeoutOps();
9+
10+
void resetStatistics();
11+
}

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,4 +134,8 @@ public interface Operation {
134134
/* ENABLE_MIGRATION end */
135135

136136
APIType getAPIType();
137+
138+
void setStartTime(long startTime);
139+
140+
long getStartTime();
137141
}

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import net.spy.memcached.MemcachedReplicaGroup;
2727
import net.spy.memcached.RedirectHandler;
2828
import net.spy.memcached.compat.SpyObject;
29+
import net.spy.memcached.metrics.OpThroughputMonitor;
2930
import net.spy.memcached.ops.APIType;
3031
import net.spy.memcached.ops.CancelledOperationStatus;
3132
import net.spy.memcached.ops.OperationCallback;
@@ -58,6 +59,16 @@ public abstract class BaseOperationImpl extends SpyObject {
5859
private OperationType opType = OperationType.UNDEFINED;
5960
private APIType apiType = APIType.UNDEFINED;
6061

62+
private long startTime;
63+
64+
public void setStartTime(long startTime) {
65+
this.startTime = startTime;
66+
}
67+
68+
public long getStartTime() {
69+
return startTime;
70+
}
71+
6172
/* ENABLE_MIGRATION if */
6273
private RedirectHandler redirectHandler = null;
6374
/* ENABLE_MIGRATION end */
@@ -95,6 +106,7 @@ public final OperationException getException() {
95106
public final boolean cancel(String cause) {
96107
if (callbacked.compareAndSet(false, true)) {
97108
cancelled = true;
109+
OpThroughputMonitor.getInstance().addCanceledOpCount();
98110
if (handlingNode != null) {
99111
cause += " @ " + handlingNode.getNodeName();
100112
}
@@ -222,6 +234,7 @@ protected final void transitionState(OperationState newState) {
222234
}
223235
if (state == OperationState.COMPLETE &&
224236
callbacked.compareAndSet(false, true)) {
237+
OpThroughputMonitor.getInstance().addCompletedOpCount();
225238
callback.complete();
226239
}
227240
}

0 commit comments

Comments
 (0)