DISCLAIMER: This project is for demonstration purposes only. Using the concept in production is highly discouraged. Use at your own risk.
Change to folder cluster.
Start the containers by running:
docker-compose up -dStopping the containers:
docker-compose downCleaning up (CAREFUL: THIS WILL DELETE ALL UNUSED VOLUMES):
docker volumes pruneChange to the client sub folder and initialize by running (might not be necessary as gradlew is part of the repository and might be up-to-date):
gradle wrapperBuild a shadow jar (which contains all required jars) by running:
./gradlew shadowJarIn the client sub folder you can use gradle to run the producer, either using the shadow jar:
./gradlew -p producer-7.2.1 runShadowor directly (relying on gradles runtime classpath):
./gradlew -p producer-7.2.1 run --args='../configuration/dev.properties ../input.txt'You can also run the shadowJar directly:
java -jar producer-7.2.1/build/libs/producer-7.2.1.jar configuration/dev.properties input.txtYou can also use the same producer code but pacakged with the (almost) latest version of the Kafka library, by using the commands above but replace the folder name producer-7.2.1 with producer-latest.
Just for testing the setup, you can consume the messages by running (in the client folder):
kafka-avro-console-consumer --bootstrap-server broker:9092 --consumer.config configuration/dev.properties --topic message --from-beginning| Tool | URL | Note |
|---|---|---|
Kafka Broker |
||
Prometheus |
||
Grafana |
Login: "kafka", Password: "kafka" |
|
JMX Exporter for the Kafka Broker |
||
Prometheus Node Exporter |
Note, the docker setup already sets the log level of the request logger to debug. The following explains how to set that log level at runtime.
Show dynamic configs for the broker (broker id is 1 thus we use entity name 1):
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --describe \
--entity-type broker-loggers --entity-name 1Set debug level to DEBUG:
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter \
--add-config "kafka.request.logger=DEBUG" \
--entity-type broker-loggers --entity-name 1Check which JMX metrics are available on the broker (you might need to add broker with hostname 127.0.0.1 to your /etc/hosts temporarily):
jconsole broker:10091The MBean kafka.server:clientSoftwareName=(client-software-name),clientSoftwareVersion=(client-software-version),listener=(listener),networkProcessor=(processor-index),type=(type) provides information about the used client library and the version.
In Prometheus, have a look at the metric called kafka_server_socketservermetrics_connections (as configured in the JMX exporter for the broker metrics). Please note that you won’t see any client ID, principal, or similar. You just see the name of the library and the version. Thus, this MBean is not useful for identifying the users of outdated versions of Kafka libraries.
Switch of debug logging:
docker compose exec broker kafka-configs --bootstrap-server broker:9092 --alter \
--delete-config kafka.request.logger \
--entity-type broker-loggers --entity-name 1You can follow the logs of the docker container running the Kafka broker by running this command in the cluster subfolder (exit with Ctrl-c):
docker-compose logs -f kafkaWhenever a client connects, lines similar to the following will appear in the docker log of the kafka broker.
broker | [2023-05-31 17:24:11,538] INFO Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":18,"requestApiVersion":3,"correlationId":0,"clientId":"producer-7.2.1","requestApiKeyName":"API_VERSIONS"},"request":{"clientSoftwareName":"apache-kafka-java","clientSoftwareVersion":"7.2.1-ccs"},"response":{"errorCode":0,"apiKeys":[{"apiKey":0,"minVersion":0,"maxVersion":9},{"apiKey":1,"minVersion":0,"maxVersion":13},{"apiKey":2,"minVersion":0,"maxVersion":7},{"apiKey":3,"minVersion":0,"maxVersion":12},{"apiKey":4,"minVersion":0,"maxVersion":6},{"apiKey":5,"minVersion":0,"maxVersion":3},{"apiKey":6,"minVersion":0,"maxVersion":7},{"apiKey":7,"minVersion":0,"maxVersion":3},{"apiKey":8,"minVersion":0,"maxVersion":8},{"apiKey":9,"minVersion":0,"maxVersion":8},{"apiKey":10,"minVersion":0,"maxVersion":4},{"apiKey":11,"minVersion":0,"maxVersion":9},{"apiKey":12,"minVersion":0,"maxVersion":4},{"apiKey":13,"minVersion":0,"maxVersion":5},{"apiKey":14,"minVersion":0,"maxVersion":5},{"apiKey":15,"minVersion":0,"maxVersion":5},{"apiKey":16,"minVersion":0,"maxVersion":4},{"apiKey":17,"minVersion":0,"maxVersion":1},{"apiKey":18,"minVersion":0,"maxVersion":3},{"apiKey":19,"minVersion":0,"maxVersion":7},{"apiKey":20,"minVersion":0,"maxVersion":6},{"apiKey":21,"minVersion":0,"maxVersion":2},{"apiKey":22,"minVersion":0,"maxVersion":4},{"apiKey":23,"minVersion":0,"maxVersion":4},{"apiKey":24,"minVersion":0,"maxVersion":3},{"apiKey":25,"minVersion":0,"maxVersion":3},{"apiKey":26,"minVersion":0,"maxVersion":3},{"apiKey":27,"minVersion":0,"maxVersion":1},{"apiKey":28,"minVersion":0,"maxVersion":3},{"apiKey":29,"minVersion":0,"maxVersion":2},{"apiKey":30,"minVersion":0,"maxVersion":2},{"apiKey":31,"minVersion":0,"maxVersion":2},{"apiKey":32,"minVersion":0,"maxVersion":4},{"apiKey":33,"minVersion":0,"maxVersion":2},{"apiKey":34,"minVersion":0,"maxVersion":2},{"apiKey":35,"minVersion":0,"maxVersion":3},{"apiKey":36,"minVersion":0,"maxVersion":2},{"apiKey":37,"minVersion":0,"maxVersion":3},{"apiKey":38,"minVersion":0,"maxVersion":2},{"apiKey":39,"minVersion":0,"maxVersion":2},{"apiKey":40,"minVersion":0,"maxVersion":2},{"apiKey":41,"minVersion":0,"maxVersion":2},{"apiKey":42,"minVersion":0,"maxVersion":2},{"apiKey":43,"minVersion":0,"maxVersion":2},{"apiKey":44,"minVersion":0,"maxVersion":1},{"apiKey":45,"minVersion":0,"maxVersion":0},{"apiKey":46,"minVersion":0,"maxVersion":0},{"apiKey":47,"minVersion":0,"maxVersion":0},{"apiKey":48,"minVersion":0,"maxVersion":1},{"apiKey":49,"minVersion":0,"maxVersion":1},{"apiKey":50,"minVersion":0,"maxVersion":0},{"apiKey":51,"minVersion":0,"maxVersion":0},{"apiKey":56,"minVersion":0,"maxVersion":1},{"apiKey":57,"minVersion":0,"maxVersion":1},{"apiKey":60,"minVersion":0,"maxVersion":0},{"apiKey":61,"minVersion":0,"maxVersion":0},{"apiKey":65,"minVersion":0,"maxVersion":0},{"apiKey":66,"minVersion":0,"maxVersion":0},{"apiKey":67,"minVersion":0,"maxVersion":0},{"apiKey":10000,"minVersion":0,"maxVersion":3},{"apiKey":10001,"minVersion":0,"maxVersion":1},{"apiKey":10002,"minVersion":0,"maxVersion":3},{"apiKey":10003,"minVersion":0,"maxVersion":3},{"apiKey":10004,"minVersion":0,"maxVersion":1},{"apiKey":10005,"minVersion":0,"maxVersion":0},{"apiKey":10006,"minVersion":0,"maxVersion":3},{"apiKey":10007,"minVersion":0,"maxVersion":2},{"apiKey":10008,"minVersion":0,"maxVersion":1},{"apiKey":10009,"minVersion":0,"maxVersion":2},{"apiKey":10010,"minVersion":0,"maxVersion":0},{"apiKey":10011,"minVersion":0,"maxVersion":1},{"apiKey":10012,"minVersion":0,"maxVersion":0},{"apiKey":10013,"minVersion":0,"maxVersion":1},{"apiKey":10014,"minVersion":0,"maxVersion":1},{"apiKey":10015,"minVersion":0,"maxVersion":0},{"apiKey":10016,"minVersion":0,"maxVersion":0},{"apiKey":10017,"minVersion":0,"maxVersion":0},{"apiKey":10018,"minVersion":0,"maxVersion":0},{"apiKey":10019,"minVersion":0,"maxVersion":0},{"apiKey":10020,"minVersion":0,"maxVersion":0},{"apiKey":10021,"minVersion":0,"maxVersion":0},{"apiKey":10022,"minVersion":0,"maxVersion":1}],"throttleTimeMs":0,"finalizedFeaturesEpoch":0},"connection":"172.18.0.9:10092-172.18.0.1:55030-3","clientAddress":"172.18.0.1","totalTimeMs":0.28,"requestQueueTimeMs":0.096,"localTimeMs":0.008,"remoteTimeMs":0.092,"throttleTimeMs":0,"responseQueueTimeMs":0.022,"sendTimeMs":0.06,"securityProtocol":"PLAINTEXT","principal":{"class":"KafkaPrincipal","type":"User","name":"ANONYMOUS","tokenAuthenticated":false},"listener":"OUT_DOCKER","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"},"isDisconnectedClient":false} (kafka.request.logger)The client ID and the client software version can easily be extracted from those lines.
Note that most of the other debug messages are filtered by the log4j configuration file, that is generated when the docker container running the Kafka broker starts using the template found in cluster/docker/kafka/log4j.properties.template.
The result of rendering that template is the following:
log4j.rootLogger=WARN, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
# Allow everthing with log level FATAL, ERROR and WARN
log4j.appender.stdout.filter.01=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.01.level=FATAL
log4j.appender.stdout.filter.01.AcceptOnMatch=true
log4j.appender.stdout.filter.02=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.02.level=ERROR
log4j.appender.stdout.filter.02.AcceptOnMatch=true
log4j.appender.stdout.filter.03=org.apache.log4j.varia.LevelMatchFilter
log4j.appender.stdout.filter.03.level=WARN
log4j.appender.stdout.filter.03.AcceptOnMatch=true
# Allow certain request log debug information: We just want to see initial version information
log4j.appender.stdout.filter.04=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.04.StringToMatch="requestApiKeyName":"API_VERSIONS"
log4j.appender.stdout.filter.04.AcceptOnMatch=true
# Filter all other unwanted request log debug information
log4j.appender.stdout.filter.05=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.05.StringToMatch=Completed request
log4j.appender.stdout.filter.05.AcceptOnMatch=false
# Filter internal traffic
log4j.appender.stdout.filter.06=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.06.StringToMatch="listener":"IN_DOCKER"
log4j.appender.stdout.filter.06.AcceptOnMatch=false
log4j.appender.stdout.filter.07=org.apache.log4j.varia.StringMatchFilter
log4j.appender.stdout.filter.07.StringToMatch="listener":"INTERNAL"
log4j.appender.stdout.filter.07.AcceptOnMatch=false
INFO Completed request
log4j.logger.kafka=INFO
log4j.logger.kafka.network.RequestChannel$=WARN
log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG
log4j.logger.kafka.request.logger=DEBUG
log4j.logger.kafka.controller=TRACE
log4j.logger.kafka.log.LogCleaner=INFO
log4j.logger.state.change.logger=TRACE
log4j.logger.kafka.authorizer.logger=WARNThe magic is to use the filter to allow all critical events first (FATAL, ERROR, WARN). Then all lines where the API_Version is requested are allowed. Finally, all other debug messages are filtered.
The docker-based Kafka cluster setup can to be updated manually. Potentially, updating the version of CP in the cluster/.env file is sufficient.
The Java libraries can be updated manually, but with support by the automatic dependency check provided by the Gradle Version Plugin. Just run the following to identify outdated libraries:
./gradlew dependencyUpdatesGradle plugins versions are managed centrally in gradle.properties so they need to be updated there.
Almost all Java library versions are managed in the gradle config files in buildSrc/src/main/kotlin. Only the kafka client version is managed directly in each subprojet in the build.gradle.kts for better flexibility. For the producer-7.2.1 example, all Kafka-specific libraries should be kept as-is, for obvious reasons.
Until KIP-714 is supported in Confluent Platform, the only way to map client software names and versions to the corresponding clientIds or principals (the latter has not been demonstrated in this demo) is by enabling the DEBUG log level for the Kafka request logger and monitor the requests.