Skip to content

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747

Draft
shashank-reddy-nr wants to merge 2 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id
Draft

feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics#1747
shashank-reddy-nr wants to merge 2 commits into
newrelic:mainfrom
shashank-reddy-nr:feat/kafka-cluster-id

Conversation

@shashank-reddy-nr
Copy link
Copy Markdown

What

Adds per-cluster, per-topic metrics for both kafka-python-ng and confluent-kafka.

kafka-python-ng (messagebroker_kafkapython.py)

  • _fetch_kafka_cluster_id(): uses KafkaAdminClient.describe_cluster() in a daemon
    thread with double-checked-locking + sentinel ("") to prevent duplicate spawns.
  • Full config passed via _ADMIN_CLIENT_ALLOWED_KEYS allowlist — SASL/SSL
    credentials inherited, consumer-only keys excluded to prevent TypeError.
  • wrap_KafkaConsumer_init_cluster: proactive cluster ID fetch on KafkaConsumer
    init (symmetric with producer).
  • Cluster metric recorded in both wrap_KafkaProducer_send and
    wrap_kafkaconsumer_next using _cache_key (not key) — no message routing
    key corruption.

confluent-kafka (messagebroker_confluentkafka.py)

  • _fetch_cluster_id() already uses instance.list_topics() — extended to set
    _nr_bootstrap_servers before fetch for SerializingProducer and
    DeserializingConsumer (was missing, caused uncached thread spawns).

Tests

  • Unit: test_cluster_metrics_unit.py — key preservation regression, auth config
    allowlist, mock target uses kafka.KafkaAdminClient.
  • Integration: kafka-python-producer and kafka-python-consumer validated in
    NR staging (account 11833718).

Before contributing, please read our contributing guidelines and code of conduct.

Overview

Describe the changes present in the pull request

Related Github Issue

Include a link to the related GitHub issue, if applicable

Testing

The agent includes a suite of tests which should be used to
verify your changes don't break existing functionality. These tests will run with
Github Actions when a pull request is made. More details on running the tests locally can be found in our
testing guidelines,
For most contributions it is strongly recommended to add additional tests which
exercise your changes.

Adds per-cluster, per-topic produce and consume metrics that uniquely identify
Kafka clusters by their UUID (cluster ID). These complement the existing per-node
MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses
of the same cluster into a single metric, enabling cluster-level throughput
analysis across MSK, Confluent Cloud, and self-hosted Kafka.

Metric format:
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce
  MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume

The cluster ID is fetched automatically using the client's own authenticated
connection — no extra configuration or credentials needed.

Also includes:
- Unit and integration tests for all new code paths
- Bug fixes identified in code review (volatile fields, thread-safety,
  per-message vs per-poll counting, auth config passthrough)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@shashank-reddy-nr shashank-reddy-nr requested a review from a team as a code owner June 1, 2026 10:44
@shashank-reddy-nr shashank-reddy-nr marked this pull request as draft June 1, 2026 10:52
@mergify mergify Bot added the tests-failing Tests failing in CI. label Jun 1, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Jun 1, 2026

Codecov Report

❌ Patch coverage is 68.84058% with 43 lines in your changes missing coverage. Please review.
✅ Project coverage is 81.99%. Comparing base (949339a) to head (f653aa4).

Files with missing lines Patch % Lines
newrelic/hooks/messagebroker_kafkapython.py 65.88% 19 Missing and 10 partials ⚠️
newrelic/hooks/messagebroker_confluentkafka.py 73.07% 8 Missing and 6 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #1747      +/-   ##
==========================================
- Coverage   82.08%   81.99%   -0.10%     
==========================================
  Files         215      215              
  Lines       26309    26445     +136     
  Branches     4150     4175      +25     
==========================================
+ Hits        21596    21683      +87     
- Misses       3301     3332      +31     
- Partials     1412     1430      +18     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

- .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with
  simple string extraction for newrelic header parsing — no external dependency
- Node.js: Fix lint violations — empty catch blocks now log debug messages,
  extract injectHeaders() helper to reduce cognitive complexity, remove unused
  variables from test, add JSDoc @param description
- Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix
  race condition where daemon thread overwrote seeded test fixture values;
  reactive fetch on first consumed message is sufficient

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

tests-failing Tests failing in CI.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants