Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ dependencies {
compileOnly group: 'org.apache.kafka', name: 'kafka_2.13', version: '4.0.0'
compileOnly group: 'com.typesafe.scala-logging', name: 'scala-logging_2.13', version: '3.9.5'
implementation group: 'com.fasterxml.jackson.module', name: 'jackson-module-scala_2.13', version: '2.16.2'
implementation group: 'com.google.guava', name: 'guava', version: '32.1.3-jre'
implementation group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '3.2.0'

testImplementation group: 'org.scalatest', name: 'scalatest_2.13', version: '3.2.17'
testImplementation group: 'org.scalatestplus', name: 'junit-4-13_2.13', version: '3.2.17.0'
Expand All @@ -40,7 +40,7 @@ dependencies {
shadowJar {
dependencies {
exclude(dependency {
!(it.moduleGroup in ['org.openpolicyagent.kafka', 'com.google.guava']
!(it.moduleGroup in ['org.openpolicyagent.kafka', 'com.github.ben-manes.caffeine']
|| (it.moduleGroup == 'com.fasterxml.jackson.module' && it.moduleName == 'jackson-module-scala_2.13')
|| (it.moduleGroup == 'com.thoughtworks.paranamer' && it.moduleName == 'paranamer'))
})
Expand Down
117 changes: 74 additions & 43 deletions src/main/scala/org/openpolicyagent/kafka/OpaAuthorizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.{JsonSerializer, SerializerProvider}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.google.common.annotations.VisibleForTesting
import com.google.common.cache.{Cache, CacheBuilder}
import com.github.benmanes.caffeine.cache.Caffeine
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter, AclOperation}
Expand All @@ -19,14 +18,15 @@ import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourceT
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.server.authorizer._

import java.io.{File, FileInputStream}
import java.io.{File, FileInputStream, IOException}
import java.net.http.HttpRequest.BodyPublishers
import java.net.http.HttpResponse.BodyHandlers
import java.net.http.{HttpClient, HttpRequest}
import java.net.{URI, URL}
import java.security.KeyStore
import java.time.Duration.ofSeconds
import java.util.concurrent._
import java.util.function.{Function => JavaFunction}
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
import scala.jdk.CollectionConverters._

Expand All @@ -43,17 +43,19 @@ class OpaAuthorizer extends Authorizer with LazyLogging {

private var metrics: Option[Metrics] = None

private lazy val cache = CacheBuilder.newBuilder
private lazy val cache = Caffeine
.newBuilder()
.initialCapacity(config.getOrElse("opa.authorizer.cache.initial.capacity", "5000").toInt)
.maximumSize(maxCacheCapacity)
.expireAfterWrite(config.getOrElse("opa.authorizer.cache.expire.after.seconds", "3600").toInt, TimeUnit.SECONDS)
.recordStats()
.build
.asInstanceOf[Cache[CacheableRequest, Boolean]]
.build[CacheableRequest, Boolean]

override def authorize(requestContext: AuthorizableRequestContext, actions: java.util.List[Action]): java.util.List[AuthorizationResult] = {
actions.asScala.map { action => authorizeAction(requestContext, action) }.asJava
}
override def authorize(
requestContext: AuthorizableRequestContext,
actions: java.util.List[Action]
): java.util.List[AuthorizationResult] =
actions.asScala.map(action => authorizeAction(requestContext, action)).asJava

override def configure(configs: java.util.Map[String, _]): Unit = {
logger.debug(s"Call to configure() with config $configs")
Expand Down Expand Up @@ -92,37 +94,47 @@ class OpaAuthorizer extends Authorizer with LazyLogging {

// Not really used but has to be implemented for internal stuff. Can maybe be used to check OPA connectivity?
// Just doing the same as the acl authorizer does here: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/security/authorizer/AclAuthorizer.scala#L185
override def start(authorizerServerInfo: AuthorizerServerInfo): java.util.Map[Endpoint, _ <: CompletionStage[Void] ] = {
override def start(
authorizerServerInfo: AuthorizerServerInfo
): java.util.Map[Endpoint, _ <: CompletionStage[Void]] = {
maybeSetupMetrics(authorizerServerInfo.clusterResource().clusterId(), authorizerServerInfo.brokerId())
authorizerServerInfo.endpoints.asScala.map { endpoint =>
endpoint -> CompletableFuture.completedFuture[Void](null) }.toMap.asJava
authorizerServerInfo.endpoints.asScala
.map { endpoint =>
endpoint -> CompletableFuture.completedFuture[Void](null)
}
.toMap
.asJava
}

private[kafka] def maybeSetupMetrics(clusterId: String, brokerId: Int): Unit ={
private[kafka] def maybeSetupMetrics(clusterId: String, brokerId: Int): Unit = {
val isEnabled = config.getOrElse("opa.authorizer.metrics.enabled", "false").toBoolean
if (isEnabled){
if (isEnabled) {
metrics = Option(new Metrics())
val jmxReporter = new JmxReporter()
jmxReporter.contextChange(createMetricsContext(clusterId, brokerId))
metrics.get.addReporter(jmxReporter)

val authorizedRequestName = metrics.get.metricName(MetricsLabel.AUTHORIZED_REQUEST_COUNT, MetricsLabel.RESULT_GROUP)
val authorizedRequestName =
metrics.get.metricName(MetricsLabel.AUTHORIZED_REQUEST_COUNT, MetricsLabel.RESULT_GROUP)
val authorizedRequestSensor = metrics.get.sensor(MetricsLabel.AUTHORIZED_REQUEST_COUNT)
authorizedRequestSensor.add(authorizedRequestName, new CumulativeCount())

val unauthorizedRequestName = metrics.get.metricName(MetricsLabel.UNAUTHORIZED_REQUEST_COUNT, MetricsLabel.RESULT_GROUP)
val unauthorizedRequestName =
metrics.get.metricName(MetricsLabel.UNAUTHORIZED_REQUEST_COUNT, MetricsLabel.RESULT_GROUP)
val unauthorizedRequestSensor = metrics.get.sensor(MetricsLabel.UNAUTHORIZED_REQUEST_COUNT)
unauthorizedRequestSensor.add(unauthorizedRequestName, new CumulativeCount())

val requestToOPAName = metrics.get.metricName(MetricsLabel.REQUEST_TO_OPA_COUNT, MetricsLabel.REQUEST_HANDLE_GROUP)
val requestToOPAName =
metrics.get.metricName(MetricsLabel.REQUEST_TO_OPA_COUNT, MetricsLabel.REQUEST_HANDLE_GROUP)
val requestToOPASensor = metrics.get.sensor(MetricsLabel.REQUEST_TO_OPA_COUNT)
requestToOPASensor.add(requestToOPAName, new CumulativeCount())

val cacheHitName = metrics.get.metricName(MetricsLabel.CACHE_HIT_RATE, MetricsLabel.REQUEST_HANDLE_GROUP)
val cacheHitSensor = metrics.get.sensor(MetricsLabel.CACHE_HIT_RATE)
cacheHitSensor.add(cacheHitName, new Value())

val cacheUsageName = metrics.get.metricName(MetricsLabel.CACHE_USAGE_PERCENTAGE, MetricsLabel.REQUEST_HANDLE_GROUP)
val cacheUsageName =
metrics.get.metricName(MetricsLabel.CACHE_USAGE_PERCENTAGE, MetricsLabel.REQUEST_HANDLE_GROUP)
val cacheUsageSensor = metrics.get.sensor(MetricsLabel.CACHE_USAGE_PERCENTAGE)
cacheUsageSensor.add(cacheUsageName, new Value())

Expand All @@ -138,15 +150,21 @@ class OpaAuthorizer extends Authorizer with LazyLogging {
new KafkaMetricsContext(prefix, contextLabels)
}


@VisibleForTesting
private[kafka] def getCache = cache

// None of the below needs implementations
override def close(): Unit = { }
override def acls(acls: AclBindingFilter): java.lang.Iterable[AclBinding] = ???
override def deleteAcls(requestContext: AuthorizableRequestContext, aclBindingFilters: java.util.List[AclBindingFilter]): java.util.List[_ <: CompletionStage[AclDeleteResult]] = ???
override def createAcls(requestContext: AuthorizableRequestContext, aclBindings: java.util.List[AclBinding]): java.util.List[_ <: CompletionStage[AclCreateResult]] = ???

override def deleteAcls(
requestContext: AuthorizableRequestContext,
aclBindingFilters: java.util.List[AclBindingFilter]
): java.util.List[_ <: CompletionStage[AclDeleteResult]] = ???

override def createAcls(
requestContext: AuthorizableRequestContext,
aclBindings: java.util.List[AclBinding]
): java.util.List[_ <: CompletionStage[AclCreateResult]] = ???

private def authorizeAction(requestContext: AuthorizableRequestContext, action: Action): AuthorizationResult = {
val resource = action.resourcePattern
Expand All @@ -158,57 +176,60 @@ class OpaAuthorizer extends Authorizer with LazyLogging {

if(metrics.isDefined){
metrics.get.sensor(MetricsLabel.CACHE_HIT_RATE).record(cache.stats().hitRate())
metrics.get.sensor(MetricsLabel.CACHE_USAGE_PERCENTAGE).record(cache.size() / maxCacheCapacity.toDouble)
metrics.get.sensor(MetricsLabel.CACHE_USAGE_PERCENTAGE).record(cache.estimatedSize() / maxCacheCapacity.toDouble)
result match {
case AuthorizationResult.DENIED => metrics.get.sensor(MetricsLabel.UNAUTHORIZED_REQUEST_COUNT).record()
case AuthorizationResult.DENIED => metrics.get.sensor(MetricsLabel.UNAUTHORIZED_REQUEST_COUNT).record()
case AuthorizationResult.ALLOWED => metrics.get.sensor(MetricsLabel.AUTHORIZED_REQUEST_COUNT).record()
}
}

result
}

override def authorizeByResourceType(requestContext: AuthorizableRequestContext, op: AclOperation, resourceType: ResourceType): AuthorizationResult = {
doAuthorize(requestContext, new Action(op, new ResourcePattern(resourceType, "", PatternType.PREFIXED), 0, true, true))
}
override def authorizeByResourceType(
requestContext: AuthorizableRequestContext,
op: AclOperation,
resourceType: ResourceType
): AuthorizationResult =
doAuthorize(
requestContext,
new Action(op, new ResourcePattern(resourceType, "", PatternType.PREFIXED), 0, true, true)
)

private def doAuthorize(requestContext: AuthorizableRequestContext, action: Action) = {
// ensure we compare identical classes
val sessionPrincipal = requestContext.principal
val principal = if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
else
sessionPrincipal
val principal =
if (classOf[KafkaPrincipal] != sessionPrincipal.getClass)
new KafkaPrincipal(sessionPrincipal.getPrincipalType, sessionPrincipal.getName)
else
sessionPrincipal

val host = requestContext.clientAddress.getHostAddress

val cachableRequest = CacheableRequest(principal, action, host)
val request = Request(Input(requestContext, action))

def allowAccess = {
try {
cache.get(cachableRequest, new AllowCallable(request, opaUrl, allowOnError, metrics))
}
def allowAccess =
try cache.get(cachableRequest, new AllowCallable(request, opaUrl, allowOnError, metrics))
catch {
case e: ExecutionException =>
case e: Exception =>
logger.warn(s"Exception in decision retrieval: ${e.getMessage}")
logger.trace("Exception trace", e)
allowOnError
}
}

// Evaluate if operation is allowed
val authorized = isSuperUser(principal) || allowAccess

if (authorized) AuthorizationResult.ALLOWED else AuthorizationResult.DENIED
}

def isSuperUser(principal: KafkaPrincipal): Boolean = {
def isSuperUser(principal: KafkaPrincipal): Boolean =
if (superUsers.contains(principal.toString)) {
logger.trace(s"User ${principal} is super user")
logger.trace(s"User $principal is super user")
return true
} else false
}
}

class ResourcePatternSerializer() extends JsonSerializer[ResourcePattern] {
Expand Down Expand Up @@ -297,14 +318,24 @@ object AllowCallable {
.addSerializer(classOf[KafkaPrincipal], new KafkaPrincipalSerializer)
.addSerializer(classOf[RequestHeader], new RequestHeaderSerializer)
.addSerializer(classOf[RequestHeaderData], new RequestHeaderDataSerializer)
private val objectMapper = JsonMapper.builder().addModule(requestSerializerModule).addModule(DefaultScalaModule).build()

private val objectMapper =
JsonMapper.builder().addModule(requestSerializerModule).addModule(DefaultScalaModule).build()

// If a TrusStore is configured by the user,
// This HttpClient is replaced by OpaAuthorizer.configure()
var client = HttpClient.newBuilder.connectTimeout(ofSeconds(5)).build
}
class AllowCallable(request: Request, opaUrl: URI, allowOnError: Boolean, metrics: Option[Metrics]) extends Callable[Boolean] with LazyLogging {
override def call(): Boolean = {

class AllowCallable(
request: Request,
opaUrl: URI,
allowOnError: Boolean,
metrics: Option[Metrics]
) extends JavaFunction[CacheableRequest, Boolean]
with LazyLogging {

override def apply(key: CacheableRequest): Boolean = {
logger.debug(s"Cache miss, querying OPA for decision")
val reqJson = AllowCallable.objectMapper.writeValueAsString(request)
val requestBuilder = HttpRequest.newBuilder.timeout(ofSeconds(5)).header("Content-Type", "application/json")
Expand Down
20 changes: 10 additions & 10 deletions src/test/scala/org/openpolicyagent/kafka/OpaAuthorizerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val request = createRequest("alice-producer", actions)

opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.ALLOWED).asJava)
opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)
}

"OpaAuthorizer" should "return authorization results for multiple actions in the same request in right order" in {
Expand All @@ -103,7 +103,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
AuthorizationResult.ALLOWED,
AuthorizationResult.ALLOWED,
AuthorizationResult.ALLOWED).asJava)
opaAuthorizer.getCache.size should be (4)
opaAuthorizer.getCache.estimatedSize() should be (4)
}

"OpaAuthorizer" should "not authorize when username does not match name of topic" in {
Expand All @@ -114,7 +114,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val request = createRequest("alice-producer", actions)

opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.DENIED).asJava)
opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)
}

"OpaAuthorizer" should "not authorize read request for producer" in {
Expand All @@ -125,7 +125,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val request = createRequest("alice-producer", actions)

opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.DENIED).asJava)
opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)
}

"OpaAuthorizer" should "not authorize write request for consumer" in {
Expand All @@ -136,7 +136,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val request = createRequest("alice-consumer", actions)

opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.DENIED).asJava)
opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)
}

"OpaAuthorizer" should "cache the first request" in {
Expand All @@ -150,7 +150,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.ALLOWED).asJava)
}

opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)

val otherActions = List(
createAction("bob-topic", AclOperation.READ),
Expand All @@ -161,7 +161,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
opaAuthorizer.authorize(otherRequest.requestContext, otherRequest.actions.asJava) should be (List(AuthorizationResult.ALLOWED).asJava)
}

opaAuthorizer.getCache.size should be (2)
opaAuthorizer.getCache.estimatedSize() should be (2)
}

"OpaAuthorizer" should "not cache decisions while errors occur" in {
Expand All @@ -172,7 +172,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val request = createRequest("alice-consumer", actions)

opaAuthorizer.authorize(request.requestContext, request.actions.asJava) should be (List(AuthorizationResult.DENIED).asJava)
opaAuthorizer.getCache.size should be (0)
opaAuthorizer.getCache.estimatedSize() should be (0)
}

"OpaAuthorizer" should "authorize super users without checking with OPA" in {
Expand Down Expand Up @@ -206,7 +206,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
new ListenerName("SASL_PLAINTEXT"), SecurityProtocol.SASL_PLAINTEXT, new ClientInformation("rdkafka", "1.0.0"), false)

opaAuthorizer.authorize(requestContext, actions.asJava) should be (List(AuthorizationResult.ALLOWED).asJava)
opaAuthorizer.getCache.size should be (1)
opaAuthorizer.getCache.estimatedSize() should be (1)
}

"OpaAuthorizer" should "set up metrics system if enabled" in {
Expand Down Expand Up @@ -287,7 +287,7 @@ class OpaAuthorizerSpec extends AnyFlatSpec with Matchers with PrivateMethodTest
val cacheUsagePercentageActual = server.getAttribute(
new ObjectName(MetricsLabel.NAMESPACE + ":type=" + MetricsLabel.REQUEST_HANDLE_GROUP),
MetricsLabel.CACHE_USAGE_PERCENTAGE)
assert(cacheUsagePercentageActual == (opaAuthorizer.getCache.size() / defaultCacheCapacity.toDouble))
assert(cacheUsagePercentageActual == (opaAuthorizer.getCache.estimatedSize() / defaultCacheCapacity.toDouble))
}


Expand Down