Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions newrelic/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3111,6 +3111,9 @@ def _process_module_builtin_defaults():
_process_module_definition(
"kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat"
)
_process_module_definition(
"kafka.cluster", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_cluster"
)
_process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging")
_process_module_definition(
"kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion"
Expand Down
90 changes: 90 additions & 0 deletions newrelic/hooks/messagebroker_confluentkafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import logging
import sys
import threading

from newrelic.api.application import application_instance
from newrelic.api.error_trace import wrap_error_trace
Expand All @@ -34,6 +35,55 @@
HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout"


# Module-level cache: bootstrap_servers_string → cluster_id.
# Prevents multiple producers/consumers sharing the same brokers from launching
# concurrent list_topics() calls. Empty-string sentinel marks an in-flight fetch.
_nr_cluster_id_cache = {}
_nr_cluster_id_cache_lock = threading.Lock()


def _fetch_cluster_id(instance):
"""Fetch cluster ID using the instance's own librdkafka connection (no extra auth needed).

list_topics() reuses the existing authenticated connection — no separate AdminClient.
Runs in a daemon thread to avoid blocking the hot path.

A module-level cache keyed by bootstrap.servers ensures only one fetch is ever
in-flight per unique broker set, regardless of how many producers/consumers are
created with those brokers.
"""
servers = getattr(instance, "_nr_bootstrap_servers", None)
cache_key = ",".join(servers) if servers else None

if cache_key:
with _nr_cluster_id_cache_lock:
cached = _nr_cluster_id_cache.get(cache_key)
if cached is not None:
# Already resolved — reuse the cached value directly.
if cached:
instance._nr_cluster_id = cached
return
# Set sentinel to prevent duplicate fetches while this one is in-flight.
_nr_cluster_id_cache[cache_key] = ""

def _run():
try:
meta = instance.list_topics(timeout=5)
cluster_id = getattr(meta, "cluster_id", None)
if cluster_id:
instance._nr_cluster_id = cluster_id
if cache_key:
with _nr_cluster_id_cache_lock:
_nr_cluster_id_cache[cache_key] = cluster_id
except Exception:
# Remove sentinel on failure so a future instance can retry.
if cache_key:
with _nr_cluster_id_cache_lock:
_nr_cluster_id_cache.pop(cache_key, None)

threading.Thread(target=_run, daemon=True, name="NR-Kafka-ClusterId").start()


def wrap_Producer_produce(wrapped, instance, args, kwargs):
transaction = current_transaction()
if transaction is None:
Expand Down Expand Up @@ -63,6 +113,12 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs):
for server_name in instance._nr_bootstrap_servers:
transaction.record_custom_metric(f"MessageBroker/Kafka/Nodes/{server_name}/Produce/{topic}", 1)

cluster_id = getattr(instance, "_nr_cluster_id", None)
if cluster_id:
transaction.record_custom_metric(
f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce", 1
)

with MessageTrace(
library="Kafka", operation="Produce", destination_type="Topic", destination_name=topic, source=wrapped
):
Expand Down Expand Up @@ -171,6 +227,11 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs):
transaction.record_custom_metric(
f"MessageBroker/Kafka/Nodes/{server_name}/Consume/{destination_name}", 1
)
cluster_id = getattr(instance, "_nr_cluster_id", None)
if cluster_id:
transaction.record_custom_metric(
f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{destination_name}/Consume", 1
)
transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka"))

return record
Expand Down Expand Up @@ -213,6 +274,20 @@ def wrap_SerializingProducer_init(wrapped, instance, args, kwargs):
if hasattr(instance, "_value_serializer") and callable(instance._value_serializer):
instance._value_serializer = wrap_serializer("Serialization/Value", "MessageBroker")(instance._value_serializer)

# Set _nr_bootstrap_servers before calling _fetch_cluster_id so that the cache
# key is populated and deduplication works correctly. Without this the fetch
# runs with servers=None → cache_key=None → no sentinel → no dedup → every
# SerializingProducer construction spawns an uncached AdminClient thread.
try:
conf = kwargs.get("conf") or (args[0] if args else {})
servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None
if servers:
instance._nr_bootstrap_servers = servers.split(",")
except Exception:
pass

_fetch_cluster_id(instance)


def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)
Expand All @@ -223,6 +298,17 @@ def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs):
if hasattr(instance, "_value_deserializer") and callable(instance._value_deserializer):
instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer)

# Same fix as wrap_SerializingProducer_init — set bootstrap servers before fetch.
try:
conf = kwargs.get("conf") or (args[0] if args else {})
servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None
if servers:
instance._nr_bootstrap_servers = servers.split(",")
except Exception:
pass

_fetch_cluster_id(instance)


def wrap_Producer_init(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)
Expand All @@ -236,6 +322,8 @@ def wrap_Producer_init(wrapped, instance, args, kwargs):
except Exception:
pass

_fetch_cluster_id(instance)


def wrap_Consumer_init(wrapped, instance, args, kwargs):
wrapped(*args, **kwargs)
Expand All @@ -249,6 +337,8 @@ def wrap_Consumer_init(wrapped, instance, args, kwargs):
except Exception:
pass

_fetch_cluster_id(instance)


def wrap_immutable_class(module, class_name):
# Wrap immutable binary extension class with a mutable Python subclass
Expand Down
Loading
Loading