feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747
Draft
shashank-reddy-nr wants to merge 2 commits into
Draft
feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747shashank-reddy-nr wants to merge 2 commits into
shashank-reddy-nr wants to merge 2 commits into
Conversation
Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.
Metric format:
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume
The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.
Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
per-message vs per-poll counting, auth config passthrough)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1747 +/- ##
==========================================
- Coverage 82.08% 81.99% -0.10%
==========================================
Files 215 215
Lines 26309 26445 +136
Branches 4150 4175 +25
==========================================
+ Hits 21596 21683 +87
- Misses 3301 3332 +31
- Partials 1412 1430 +18 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with simple string extraction for newrelic header parsing — no external dependency - Node.js: Fix lint violations — empty catch blocks now log debug messages, extract injectHeaders() helper to reduce cognitive complexity, remove unused variables from test, add JSDoc @param description - Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix race condition where daemon thread overwrote seeded test fixture values; reactive fetch on first consumed message is sufficient Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What
Adds per-cluster, per-topic metrics for both kafka-python-ng and confluent-kafka.
kafka-python-ng (messagebroker_kafkapython.py)
thread with double-checked-locking + sentinel ("") to prevent duplicate spawns.
credentials inherited, consumer-only keys excluded to prevent TypeError.
init (symmetric with producer).
wrap_kafkaconsumer_next using _cache_key (not
key) — no message routingkey corruption.
confluent-kafka (messagebroker_confluentkafka.py)
_nr_bootstrap_servers before fetch for SerializingProducer and
DeserializingConsumer (was missing, caused uncached thread spawns).
Tests
allowlist, mock target uses kafka.KafkaAdminClient.
NR staging (account 11833718).
Before contributing, please read our contributing guidelines and code of conduct.
Overview
Describe the changes present in the pull request
Related Github Issue
Include a link to the related GitHub issue, if applicable
Testing
The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found in our
testing guidelines,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.