Skip to content

Commit 60d55cc

Browse files
yangguo-mssenthilm-ms
authored andcommitted
[AzPubSub] Adding tags/dimensions for Authorizer and Authenticator metrics (#64)
1 parent d674395 commit 60d55cc

4 files changed

Lines changed: 185 additions & 20 deletions

File tree

azpubsub/src/main/java/com/microsoft/azpubsub/security/auth/AzPubSubPrincipal.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,17 @@
2525
*/
2626
public class AzPubSubPrincipal extends KafkaPrincipal {
2727
private Set<String> roles;
28+
private String principalStr;
2829

2930
public AzPubSubPrincipal(String principalType, String name, Set<String> roles) {
3031
super(principalType, name);
3132
this.roles = roles;
33+
this.principalStr = "";
34+
if (roles.size() > 1) {
35+
this.principalStr = roles.stream().skip(1).findFirst().orElse("");
36+
} else {
37+
this.principalStr = roles.stream().findFirst().orElse("");
38+
}
3239
}
3340

3441
public Set<String> getRoles() {
@@ -46,4 +53,8 @@ public boolean equals(Object o) {
4653
public int hashCode() {
4754
return super.hashCode();
4855
}
56+
57+
public String getPrincipalName() {
58+
return principalStr;
59+
}
4960
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.microsoft.azpubsub.security
2+
3+
import com.yammer.metrics.core.Meter
4+
import kafka.utils.Pool
5+
6+
import java.util.concurrent.TimeUnit
7+
import scala.jdk.CollectionConverters._
8+
9+
class AuthenticatorMetrics(tags: scala.collection.Map[String, String]) extends AzPubSubSecurityMetrics {
10+
11+
case class MeterWrapper(metricType: String, eventType: String) {
12+
@volatile private var lazyMeter: Meter = _
13+
private val meterLock = new Object
14+
15+
def meter(): Meter = {
16+
var meter = lazyMeter
17+
if (meter == null) {
18+
meterLock synchronized {
19+
meter = lazyMeter
20+
if (meter == null) {
21+
meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
22+
lazyMeter = meter
23+
}
24+
}
25+
}
26+
meter
27+
}
28+
29+
if (tags.isEmpty) // greedily initialize the general topic metrics
30+
meter()
31+
}
32+
33+
// an internal map for "lazy initialization" of certain metrics
34+
private val metricTypeMap = new Pool[String, MeterWrapper]
35+
metricTypeMap.putAll(Map(
36+
AuthenticatorStats.AuthenticatorServerSuccessPerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorServerSuccessPerSec, "success"),
37+
AuthenticatorStats.AuthenticatorServerFailurePerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorServerFailurePerSec, "failure"),
38+
AuthenticatorStats.AuthenticatorClientSuccessPerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorClientSuccessPerSec, "success"),
39+
AuthenticatorStats.AuthenticatorClientFailurePerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorClientFailurePerSec, "failure"),
40+
AuthenticatorStats.AuthenticatorClientTrustDisabledPerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorClientTrustDisabledPerSec, "success"),
41+
AuthenticatorStats.AuthenticatorDsmsCertCacheSuccessPerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorDsmsCertCacheSuccessPerSec, "success"),
42+
AuthenticatorStats.AuthenticatorDsmsCertCacheFailurePerSec -> MeterWrapper(AuthenticatorStats.AuthenticatorDsmsCertCacheFailurePerSec, "failure"),
43+
).asJava)
44+
45+
def serverSuccessRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorServerSuccessPerSec).meter()
46+
47+
def serverFailureRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorServerFailurePerSec).meter()
48+
49+
def clientSuccessRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorClientSuccessPerSec).meter()
50+
51+
def clientFailureRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorClientFailurePerSec).meter()
52+
53+
def clientTrustDisabledRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorClientTrustDisabledPerSec).meter()
54+
55+
def dsmsCertCacheSuccessRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorDsmsCertCacheSuccessPerSec).meter()
56+
57+
def dsmsCertCacheFailureRate = metricTypeMap.get(AuthenticatorStats.AuthenticatorDsmsCertCacheFailurePerSec).meter()
58+
}
59+
60+
object AuthenticatorStats {
61+
val AuthenticatorServerSuccessPerSec = "AuthenticatorServerSuccessPerSec"
62+
val AuthenticatorServerFailurePerSec = "AuthenticatorServerFailurePerSec"
63+
val AuthenticatorClientSuccessPerSec = "AuthenticatorClientSuccessPerSec"
64+
val AuthenticatorClientFailurePerSec = "AuthenticatorClientFailurePerSec"
65+
val AuthenticatorClientTrustDisabledPerSec = "AuthenticatorClientTrustDisabledPerSec"
66+
val AuthenticatorDsmsCertCacheSuccessPerSec = "AuthenticatorDsmsCertCacheSuccessPerSec"
67+
val AuthenticatorDsmsCertCacheFailurePerSec = "AuthenticatorDsmsCertCacheFailurePerSec"
68+
}
69+
70+
class AuthenticatorStats {
71+
private val stats = new Pool[String, AuthenticatorMetrics]
72+
73+
def allStats(identity: String, authType: String): AuthenticatorMetrics = {
74+
val identityStr = identity.replaceAll(",", "_").replaceAll("\\\\\\\\", "_").replaceAll("\\\\", "_")
75+
val tags: scala.collection.Map[String, String] = Map("auth-type" -> authType, "identity" -> identityStr)
76+
77+
val key = authType + identityStr
78+
if (!stats.contains(key)) {
79+
stats.put(key, new AuthenticatorMetrics(tags))
80+
}
81+
82+
stats.get(key)
83+
}
84+
}

azpubsub/src/main/scala/com/microsoft/azpubsub/security/auth/AzPubSubAclAuthorizer.scala

Lines changed: 89 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,25 @@ package com.microsoft.azpubsub.security.auth
33
import com.typesafe.scalalogging.Logger
44
import com.yammer.metrics.core.{Meter, MetricName}
55
import kafka.metrics.KafkaMetricsGroup
6-
import kafka.security.auth._
7-
import kafka.security.authorizer.{AclAuthorizer, AuthorizerUtils}
8-
import kafka.utils.Logging
6+
import kafka.security.authorizer.AclAuthorizer
7+
import kafka.utils.{Logging, Pool}
8+
import org.apache.kafka.common.resource.ResourceType.TOPIC
99
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
1010
import org.apache.kafka.common.utils.Utils
1111
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult}
1212

1313
import java.net.InetAddress
1414
import java.util
1515
import java.util.concurrent._
16-
import scala.collection.JavaConverters._
16+
import scala.jdk.CollectionConverters._
1717

1818
/*
1919
* AzPubSub ACL Authorizer to handle the certificate & role based principal type
2020
*/
21-
class AzPubSubAclAuthorizer extends AclAuthorizer with Logging with KafkaMetricsGroup {
21+
class AzPubSubAclAuthorizer extends AclAuthorizer with Logging {
2222
private[security] val aclAuthorizerLogger = Logger("kafka.authorizer.logger")
2323

24-
private val successRate: Meter = newMeter("AuthorizerSuccessPerSec", "success", TimeUnit.SECONDS)
25-
private val failureRate: Meter = newMeter("AuthorizerFailurePerSec", "failure", TimeUnit.SECONDS)
26-
private val disabledRate: Meter = newMeter("AuthorizerDisabledPerSec", "success", TimeUnit.SECONDS)
24+
private val authorizerStats = new AuthorizerStats
2725

2826
private var authZConfig: AuthZConfig = null
2927

@@ -34,20 +32,23 @@ class AzPubSubAclAuthorizer extends AclAuthorizer with Logging with KafkaMetrics
3432
super.configure(javaConfigs)
3533
}
3634

37-
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
38-
explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags)
39-
}
40-
4135
override def authorize(requestContext: AuthorizableRequestContext, actions: util.List[Action]): util.List[AuthorizationResult] = {
4236
actions.asScala.map { action => this.authorizeAction(requestContext, action) }.asJava
4337
}
4438

4539
private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
4640
val resource = action.resourcePattern
47-
if (resource.resourceType == Topic && authZConfig.isDisabled(resource.name)) {
41+
val sessionPrincipal = requestContext.principal
42+
var principalName = sessionPrincipal.getName
43+
if (classOf[AzPubSubPrincipal] == sessionPrincipal.getClass) {
44+
val principal = sessionPrincipal.asInstanceOf[AzPubSubPrincipal]
45+
principalName = principal.getPrincipalName
46+
}
47+
48+
if (resource.resourceType == TOPIC && authZConfig.isDisabled(resource.name)) {
4849
aclAuthorizerLogger.debug(s"AuthZ is disabled for resource: $resource")
49-
successRate.mark()
50-
disabledRate.mark()
50+
authorizerStats.allStats(action, principalName).successRate.mark()
51+
authorizerStats.allStats(action, principalName).disabledRate.mark()
5152
return AuthorizationResult.ALLOWED
5253
}
5354

@@ -64,23 +65,92 @@ class AzPubSubAclAuthorizer extends AclAuthorizer with Logging with KafkaMetrics
6465
}
6566
}
6667

67-
val sessionPrincipal = requestContext.principal
6868
if (classOf[AzPubSubPrincipal] == sessionPrincipal.getClass) {
6969
val principal = sessionPrincipal.asInstanceOf[AzPubSubPrincipal]
7070
for (role <- principal.getRoles.asScala) {
7171
val claimPrincipal = new KafkaPrincipal(principal.getPrincipalType(), role)
7272
val claimRequestContext = getClaimRequestContext(requestContext, claimPrincipal)
7373
if (super.authorize(claimRequestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED) {
74-
successRate.mark()
74+
authorizerStats.allStats(action, claimPrincipal.getName).successRate.mark()
7575
return AuthorizationResult.ALLOWED
7676
}
7777
}
7878
} else if (super.authorize(requestContext, List(action).asJava).asScala.head == AuthorizationResult.ALLOWED) {
79-
successRate.mark()
79+
authorizerStats.allStats(action, principalName).successRate.mark()
8080
return AuthorizationResult.ALLOWED
8181
}
8282

83-
failureRate.mark()
83+
authorizerStats.allStats(action, principalName).failureRate.mark()
8484
return AuthorizationResult.DENIED
8585
}
8686
}
87+
88+
class AuthorizerMetrics(tags: scala.collection.Map[String, String]) extends KafkaMetricsGroup {
89+
90+
case class MeterWrapper(metricType: String, eventType: String) {
91+
@volatile private var lazyMeter: Meter = _
92+
private val meterLock = new Object
93+
94+
def meter(): Meter = {
95+
var meter = lazyMeter
96+
if (meter == null) {
97+
meterLock synchronized {
98+
meter = lazyMeter
99+
if (meter == null) {
100+
meter = newMeter(metricType, eventType, TimeUnit.SECONDS, tags)
101+
lazyMeter = meter
102+
}
103+
}
104+
}
105+
meter
106+
}
107+
108+
if (tags.isEmpty) // greedily initialize the general topic metrics
109+
meter()
110+
}
111+
112+
// an internal map for "lazy initialization" of certain metrics
113+
private val metricTypeMap = new Pool[String, MeterWrapper]
114+
metricTypeMap.putAll(Map(
115+
AuthorizerStats.SuccessPerSec -> MeterWrapper(AuthorizerStats.SuccessPerSec, "success"),
116+
AuthorizerStats.FailurePerSec -> MeterWrapper(AuthorizerStats.FailurePerSec, "failure"),
117+
AuthorizerStats.DisabledPerSec -> MeterWrapper(AuthorizerStats.DisabledPerSec, "success"),
118+
).asJava)
119+
120+
def successRate = metricTypeMap.get(AuthorizerStats.SuccessPerSec).meter()
121+
122+
def failureRate = metricTypeMap.get(AuthorizerStats.FailurePerSec).meter()
123+
124+
def disabledRate = metricTypeMap.get(AuthorizerStats.DisabledPerSec).meter()
125+
126+
override def metricName(name: String, metricTags: scala.collection.Map[String, String]): MetricName = {
127+
explicitMetricName("azpubsub.security", "AuthorizerMetrics", name, metricTags)
128+
}
129+
}
130+
131+
object AuthorizerStats {
132+
val SuccessPerSec = "AuthorizerSuccessPerSec"
133+
val FailurePerSec = "AuthorizerFailurePerSec"
134+
val DisabledPerSec = "AuthorizerDisabledPerSec"
135+
}
136+
137+
class AuthorizerStats {
138+
private val stats = new Pool[String, AuthorizerMetrics]
139+
140+
def allStats(action: Action, identity: String): AuthorizerMetrics = {
141+
val resourceName = action.resourcePattern.name
142+
val resourceType = action.resourcePattern.resourceType.toString
143+
val operation = action.operation.toString
144+
145+
val identityStr = identity.replaceAll(",", "_").replaceAll("\\\\\\\\", "_").replaceAll("\\\\", "_")
146+
val tags: scala.collection.Map[String, String] = Map("resource-name" -> resourceName, "resource-type" -> resourceType,
147+
"operation" -> operation, "identity" -> identityStr)
148+
149+
val key = resourceName + resourceType + operation + identityStr
150+
if (!stats.contains(key)) {
151+
stats.put(key, new AuthorizerMetrics(tags))
152+
}
153+
154+
stats.get(key)
155+
}
156+
}

clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public static KafkaPrincipal fromString(String str) {
8282

8383
@Override
8484
public String toString() {
85-
return principalType + ":" + name;
85+
return this.principalType + ":" + this.name;
8686
}
8787

8888
@Override

0 commit comments

Comments
 (0)