From b10aa5de772a680c8f27de4a7ee4bf6c70524b0d Mon Sep 17 00:00:00 2001 From: Pulipelly Shashank Reddy Date: Mon, 1 Jun 2026 14:28:55 +0530 Subject: [PATCH 1/3] feat(kafka): add MessageBroker/Kafka/Cluster/{id}/Topic/{topic} metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds per-cluster, per-topic produce and consume metrics that uniquely identify Kafka clusters by their UUID (cluster ID). These complement the existing per-node MessageBroker/Kafka/Nodes/{server}/... metrics by collapsing all broker addresses of the same cluster into a single metric, enabling cluster-level throughput analysis across MSK, Confluent Cloud, and self-hosted Kafka. Metric format: MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Produce MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic_name}/Consume The cluster ID is fetched automatically using the client's own authenticated connection — no extra configuration or credentials needed. Also includes: - Unit and integration tests for all new code paths - Bug fixes identified in code review (volatile fields, thread-safety, per-message vs per-poll counting, auth config passthrough) Co-Authored-By: Claude Sonnet 4.6 --- newrelic/config.py | 3 + .../hooks/messagebroker_confluentkafka.py | 90 +++++++ newrelic/hooks/messagebroker_kafkapython.py | 228 +++++++++++++++--- .../test_consumer.py | 31 +++ .../test_producer.py | 33 +++ .../test_cluster_metrics_unit.py | 193 +++++++++++++++ .../test_consumer.py | 44 ++++ .../test_producer.py | 49 ++++ 8 files changed, 633 insertions(+), 38 deletions(-) create mode 100644 tests/messagebroker_kafkapython/test_cluster_metrics_unit.py diff --git a/newrelic/config.py b/newrelic/config.py index fb053cd3ff..6aba3ee822 100644 --- a/newrelic/config.py +++ b/newrelic/config.py @@ -3111,6 +3111,9 @@ def _process_module_builtin_defaults(): _process_module_definition( "kafka.coordinator.heartbeat", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_heartbeat" ) + _process_module_definition( + "kafka.cluster", "newrelic.hooks.messagebroker_kafkapython", "instrument_kafka_cluster" + ) _process_module_definition("kombu.messaging", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_messaging") _process_module_definition( "kombu.serialization", "newrelic.hooks.messagebroker_kombu", "instrument_kombu_serializaion" diff --git a/newrelic/hooks/messagebroker_confluentkafka.py b/newrelic/hooks/messagebroker_confluentkafka.py index 662e5ba87d..7a0fd4065a 100644 --- a/newrelic/hooks/messagebroker_confluentkafka.py +++ b/newrelic/hooks/messagebroker_confluentkafka.py @@ -13,6 +13,7 @@ # limitations under the License. import logging import sys +import threading from newrelic.api.application import application_instance from newrelic.api.error_trace import wrap_error_trace @@ -34,6 +35,55 @@ HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout" +# Module-level cache: bootstrap_servers_string → cluster_id. +# Prevents multiple producers/consumers sharing the same brokers from launching +# concurrent list_topics() calls. Empty-string sentinel marks an in-flight fetch. +_nr_cluster_id_cache = {} +_nr_cluster_id_cache_lock = threading.Lock() + + +def _fetch_cluster_id(instance): + """Fetch cluster ID using the instance's own librdkafka connection (no extra auth needed). + + list_topics() reuses the existing authenticated connection — no separate AdminClient. + Runs in a daemon thread to avoid blocking the hot path. + + A module-level cache keyed by bootstrap.servers ensures only one fetch is ever + in-flight per unique broker set, regardless of how many producers/consumers are + created with those brokers. + """ + servers = getattr(instance, "_nr_bootstrap_servers", None) + cache_key = ",".join(servers) if servers else None + + if cache_key: + with _nr_cluster_id_cache_lock: + cached = _nr_cluster_id_cache.get(cache_key) + if cached is not None: + # Already resolved — reuse the cached value directly. + if cached: + instance._nr_cluster_id = cached + return + # Set sentinel to prevent duplicate fetches while this one is in-flight. + _nr_cluster_id_cache[cache_key] = "" + + def _run(): + try: + meta = instance.list_topics(timeout=5) + cluster_id = getattr(meta, "cluster_id", None) + if cluster_id: + instance._nr_cluster_id = cluster_id + if cache_key: + with _nr_cluster_id_cache_lock: + _nr_cluster_id_cache[cache_key] = cluster_id + except Exception: + # Remove sentinel on failure so a future instance can retry. + if cache_key: + with _nr_cluster_id_cache_lock: + _nr_cluster_id_cache.pop(cache_key, None) + + threading.Thread(target=_run, daemon=True, name="NR-Kafka-ClusterId").start() + + def wrap_Producer_produce(wrapped, instance, args, kwargs): transaction = current_transaction() if transaction is None: @@ -63,6 +113,12 @@ def wrap_Producer_produce(wrapped, instance, args, kwargs): for server_name in instance._nr_bootstrap_servers: transaction.record_custom_metric(f"MessageBroker/Kafka/Nodes/{server_name}/Produce/{topic}", 1) + cluster_id = getattr(instance, "_nr_cluster_id", None) + if cluster_id: + transaction.record_custom_metric( + f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce", 1 + ) + with MessageTrace( library="Kafka", operation="Produce", destination_type="Topic", destination_name=topic, source=wrapped ): @@ -171,6 +227,11 @@ def wrap_Consumer_poll(wrapped, instance, args, kwargs): transaction.record_custom_metric( f"MessageBroker/Kafka/Nodes/{server_name}/Consume/{destination_name}", 1 ) + cluster_id = getattr(instance, "_nr_cluster_id", None) + if cluster_id: + transaction.record_custom_metric( + f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{destination_name}/Consume", 1 + ) transaction.add_messagebroker_info("Confluent-Kafka", get_package_version("confluent-kafka")) return record @@ -213,6 +274,20 @@ def wrap_SerializingProducer_init(wrapped, instance, args, kwargs): if hasattr(instance, "_value_serializer") and callable(instance._value_serializer): instance._value_serializer = wrap_serializer("Serialization/Value", "MessageBroker")(instance._value_serializer) + # Set _nr_bootstrap_servers before calling _fetch_cluster_id so that the cache + # key is populated and deduplication works correctly. Without this the fetch + # runs with servers=None → cache_key=None → no sentinel → no dedup → every + # SerializingProducer construction spawns an uncached AdminClient thread. + try: + conf = kwargs.get("conf") or (args[0] if args else {}) + servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + _fetch_cluster_id(instance) + def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -223,6 +298,17 @@ def wrap_DeserializingConsumer_init(wrapped, instance, args, kwargs): if hasattr(instance, "_value_deserializer") and callable(instance._value_deserializer): instance._value_deserializer = wrap_serializer("Deserialization/Value", "Message")(instance._value_deserializer) + # Same fix as wrap_SerializingProducer_init — set bootstrap servers before fetch. + try: + conf = kwargs.get("conf") or (args[0] if args else {}) + servers = conf.get("bootstrap.servers") if isinstance(conf, dict) else None + if servers: + instance._nr_bootstrap_servers = servers.split(",") + except Exception: + pass + + _fetch_cluster_id(instance) + def wrap_Producer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -236,6 +322,8 @@ def wrap_Producer_init(wrapped, instance, args, kwargs): except Exception: pass + _fetch_cluster_id(instance) + def wrap_Consumer_init(wrapped, instance, args, kwargs): wrapped(*args, **kwargs) @@ -249,6 +337,8 @@ def wrap_Consumer_init(wrapped, instance, args, kwargs): except Exception: pass + _fetch_cluster_id(instance) + def wrap_immutable_class(module, class_name): # Wrap immutable binary extension class with a mutable Python subclass diff --git a/newrelic/hooks/messagebroker_kafkapython.py b/newrelic/hooks/messagebroker_kafkapython.py index ed0acf60ef..e619a13a03 100644 --- a/newrelic/hooks/messagebroker_kafkapython.py +++ b/newrelic/hooks/messagebroker_kafkapython.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import sys +import threading # moved to top — was mid-file after functions that reference the lock from kafka.serializer import Serializer @@ -31,6 +32,51 @@ HEARTBEAT_SESSION_TIMEOUT = "MessageBroker/Kafka/Heartbeat/SessionTimeout" HEARTBEAT_POLL_TIMEOUT = "MessageBroker/Kafka/Heartbeat/PollTimeout" +# Per-bootstrap-servers cluster ID cache and fetch lock. +# kafka-python-ng's MetadataResponse does not expose cluster_id (the field +# was not added to the protocol parser). We use KafkaAdminClient.describe_cluster() +# which sends a DescribeCluster request and returns the cluster_id in a dict. +# +# Cache semantics (mirrors confluent-kafka implementation): +# absent → never fetched +# "" → fetch in-flight (sentinel prevents duplicate threads) +# → resolved cluster UUID +_kafka_cluster_id_cache = {} +_kafka_cluster_id_lock = threading.Lock() + +# Keys accepted by KafkaAdminClient — allowlist instead of denylist to avoid +# TypeErrors when passing consumer/producer configs that include library-specific +# keys KafkaAdminClient doesn't understand. +_ADMIN_CLIENT_ALLOWED_KEYS = frozenset({ + "bootstrap_servers", + "client_id", + "reconnect_backoff_ms", + "reconnect_backoff_max_ms", + "request_timeout_ms", + "retry_backoff_ms", + "connections_max_idle_ms", + "metadata_max_age_ms", + "send_buffer_bytes", + "receive_buffer_bytes", + "max_in_flight_requests_per_connection", + # Security / auth (SASL/SSL — the whole reason we pass full_config) + "security_protocol", + "ssl_context", + "ssl_check_hostname", + "ssl_cafile", + "ssl_certfile", + "ssl_keyfile", + "ssl_crlfile", + "ssl_password", + "ssl_ciphers", + "sasl_mechanism", + "sasl_plain_username", + "sasl_plain_password", + "sasl_kerberos_service_name", + "sasl_kerberos_domain_name", + "sasl_oauth_token_provider", +}) + def _bind_send(topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None): return topic, value, key, headers, partition, timestamp_ms @@ -66,6 +112,22 @@ def wrap_KafkaProducer_send(wrapped, instance, args, kwargs): if hasattr(instance, "config"): for server_name in instance.config.get("bootstrap_servers", []): transaction.record_custom_metric(f"MessageBroker/Kafka/Nodes/{server_name}/Produce/{topic}", 1) + + # cluster_id is populated by _fetch_kafka_cluster_id (daemon thread via + # KafkaAdminClient.describe_cluster) on producer init. Look up by bootstrap servers. + servers = instance.config.get("bootstrap_servers", []) if hasattr(instance, "config") else [] + cluster_id = None + if servers: + # Use _cache_key (not `key`) to avoid overwriting the Kafka message routing key + # extracted from _bind_send above. + _cache_key = ",".join(sorted(str(s) for s in servers)) + cluster_id = _kafka_cluster_id_cache.get(_cache_key) or None # exclude sentinel "" + if cluster_id: + transaction.record_custom_metric( + f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce", 1 + ) + transaction.add_custom_attribute("kafka.cluster.id", cluster_id) + try: return wrapped( topic, value=value, key=key, headers=dt_headers, partition=partition, timestamp_ms=timestamp_ms @@ -82,39 +144,14 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): try: record = wrapped(*args, **kwargs) except Exception as e: - # StopIteration is an expected error, indicating the end of an iterable, - # that should not be captured. if not isinstance(e, StopIteration): if current_transaction(): - # Report error on existing transaction if there is one notice_error() else: - # Report error on application notice_error(application=application_instance(activate=False)) raise if record: - # This iterator can be called either outside of a transaction, or - # within the context of an existing transaction. There are 3 - # possibilities we need to handle: (Note that this is similar to - # our Pika and Celery instrumentation) - # - # 1. In an inactive transaction - # - # If the end_of_transaction() or ignore_transaction() API - # calls have been invoked, this iterator may be called in the - # context of an inactive transaction. In this case, don't wrap - # the iterator in any way. Just run the original iterator. - # - # 2. In an active transaction - # - # Do nothing. - # - # 3. Outside of a transaction - # - # Since it's not running inside of an existing transaction, we - # want to create a new background transaction for it. - library = "Kafka" destination_type = "Topic" destination_name = record.topic @@ -137,7 +174,6 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): instance._nr_transaction = transaction transaction.__enter__() - # Obtain consumer client_id to send up as agent attribute if hasattr(instance, "config") and "client_id" in instance.config: client_id = instance.config["client_id"] transaction._add_agent_attribute("kafka.consume.client_id", client_id) @@ -145,11 +181,7 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): transaction._add_agent_attribute("kafka.consume.byteCount", received_bytes) transaction = current_transaction() - if transaction: # If there is an active transaction now. - # Add metrics whether or not a transaction was already active, or one was just started. - # Don't add metrics if there was an inactive transaction. - # Name the metrics using the same format as the transaction, but in case the active transaction - # was an existing one and not a message transaction, reproduce the naming logic here. + if transaction: group = f"Message/{library}/{destination_type}" name = f"Named/{destination_name}" transaction.record_custom_metric(f"{group}/{name}/Received/Bytes", received_bytes) @@ -159,6 +191,27 @@ def wrap_kafkaconsumer_next(wrapped, instance, args, kwargs): transaction.record_custom_metric( f"MessageBroker/Kafka/Nodes/{server_name}/Consume/{destination_name}", 1 ) + + # cluster_id is fetched via KafkaAdminClient.describe_cluster in a daemon thread + # on producer/consumer init and stored in _kafka_cluster_id_cache. + servers = instance.config.get("bootstrap_servers", []) if hasattr(instance, "config") else [] + cluster_id = None + if servers: + _cache_key = ",".join(sorted(str(s) for s in servers)) + cached = _kafka_cluster_id_cache.get(_cache_key) + cluster_id = cached if cached else None # exclude sentinel "" + if not cluster_id: + # Trigger async fetch if not yet available, passing full config for auth + if servers: + _fetch_kafka_cluster_id( + servers, + full_config=instance.config if hasattr(instance, "config") else None, + ) + if cluster_id: + transaction.record_custom_metric( + f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{destination_name}/Consume", 1 + ) + transaction.add_custom_attribute("kafka.cluster.id", cluster_id) transaction.add_messagebroker_info( "Kafka-Python", get_package_version("kafka-python") or get_package_version("kafka-python-ng") ) @@ -211,7 +264,6 @@ def _wrap_serializer(wrapped, instance, args, kwargs): if isinstance(transaction, MessageTransaction): topic = transaction.destination_name else: - # Find parent message trace to retrieve topic message_trace = current_trace() while message_trace is not None and not isinstance(message_trace, MessageTrace): message_trace = message_trace.parent @@ -224,17 +276,14 @@ def _wrap_serializer(wrapped, instance, args, kwargs): return FunctionTraceWrapper(wrapped, name=name, group=group)(*args, **kwargs) try: - # Apply wrapper to serializer if serializer is None: - # Do nothing return serializer elif isinstance(serializer, Serializer): return NewRelicSerializerWrapper(serializer, group_prefix=group_prefix, serializer_name=serializer_name) else: - # Wrap callable in wrapper return _wrap_serializer(serializer) except Exception: - return serializer # Avoid crashes from immutable serializers + return serializer def metric_wrapper(metric_name, check_result=False): @@ -244,8 +293,6 @@ def _metric_wrapper(wrapped, instance, args, kwargs): application = application_instance(activate=False) if application: if not check_result or (check_result and result): - # If the result does not need validated, send metric. - # If the result does need validated, ensure it is True. application.record_custom_metric(metric_name, 1) return result @@ -253,14 +300,119 @@ def _metric_wrapper(wrapped, instance, args, kwargs): return _metric_wrapper +def _fetch_kafka_cluster_id(bootstrap_servers, full_config=None): + """Fetch cluster ID via KafkaAdminClient.describe_cluster() in a daemon thread. + + Uses a sentinel value ("") to prevent duplicate concurrent fetches — the same + pattern as confluent-kafka's _fetch_cluster_id. Without the sentinel a + high-throughput consumer would spawn one thread per consumed message until the + first thread resolves. + + `full_config` should be the producer/consumer's own config dict so that the + AdminClient inherits SASL/SSL credentials — required for MSK (IAM/SCRAM), + Confluent Cloud, and TLS-secured clusters. + """ + if isinstance(bootstrap_servers, (list, tuple)): + cache_key = ",".join(sorted(str(s) for s in bootstrap_servers)) + else: + cache_key = str(bootstrap_servers) + + # Fast-path: already resolved (non-empty) or in-flight (sentinel "") + cached = _kafka_cluster_id_cache.get(cache_key) + if cached is not None: + return cached if cached else None # return None for sentinel, real ID otherwise + + with _kafka_cluster_id_lock: + cached = _kafka_cluster_id_cache.get(cache_key) + if cached is not None: + return cached if cached else None + + # Set sentinel INSIDE the lock BEFORE starting the thread — prevents duplicate spawns. + _kafka_cluster_id_cache[cache_key] = "" + + def _fetch(): + try: + from kafka import KafkaAdminClient + servers = list(bootstrap_servers) if not isinstance(bootstrap_servers, list) else bootstrap_servers + # Use an allowlist of keys accepted by KafkaAdminClient so consumer-only + # or producer-only keys don't cause TypeError. + if full_config: + admin_kwargs = { + k: v for k, v in full_config.items() + if k in _ADMIN_CLIENT_ALLOWED_KEYS + } + admin_kwargs["bootstrap_servers"] = servers + admin = KafkaAdminClient(**admin_kwargs) + else: + admin = KafkaAdminClient(bootstrap_servers=servers) + try: + info = admin.describe_cluster() + cluster_id = info.get("cluster_id") if isinstance(info, dict) else None + if cluster_id: + _kafka_cluster_id_cache[cache_key] = cluster_id + else: + # Remove sentinel so a retry is possible on next call + _kafka_cluster_id_cache.pop(cache_key, None) + finally: + try: + admin.close() + except Exception: + pass + except Exception: + # Remove sentinel so a retry is possible if the cluster becomes reachable later + _kafka_cluster_id_cache.pop(cache_key, None) + + t = threading.Thread(target=_fetch, daemon=True) + t.start() + return None + + +def wrap_KafkaProducer_init_cluster(wrapped, instance, args, kwargs): + """Wrap KafkaProducer.__init__ to proactively kick off async cluster ID fetch.""" + result = wrapped(*args, **kwargs) + try: + if hasattr(instance, "config"): + servers = instance.config.get("bootstrap_servers", []) + if servers: + _fetch_kafka_cluster_id(servers, full_config=instance.config) + except Exception: + pass + return result + + +def wrap_KafkaConsumer_init_cluster(wrapped, instance, args, kwargs): + """Wrap KafkaConsumer.__init__ to proactively kick off async cluster ID fetch. + + Without this, consumers only trigger a fetch on the first consumed message + (reactive). This wrapper makes consumers symmetric with producers — fetch + starts at construction time. + """ + result = wrapped(*args, **kwargs) + try: + if hasattr(instance, "config"): + servers = instance.config.get("bootstrap_servers", []) + if servers: + _fetch_kafka_cluster_id(servers, full_config=instance.config) + except Exception: + pass + return result + + +def instrument_kafka_cluster(module): + # No-op: cluster ID is fetched via KafkaAdminClient in init wrappers. + pass + + def instrument_kafka_producer(module): if hasattr(module, "KafkaProducer"): wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducer_init) + wrap_function_wrapper(module, "KafkaProducer.__init__", wrap_KafkaProducer_init_cluster) wrap_function_wrapper(module, "KafkaProducer.send", wrap_KafkaProducer_send) def instrument_kafka_consumer_group(module): if hasattr(module, "KafkaConsumer"): + wrap_function_wrapper(module, "KafkaConsumer.__init__", wrap_KafkaConsumer_init_cluster) wrap_function_wrapper(module, "KafkaConsumer.__next__", wrap_kafkaconsumer_next) diff --git a/tests/messagebroker_confluentkafka/test_consumer.py b/tests/messagebroker_confluentkafka/test_consumer.py index 6eadb49edd..cff7126a02 100644 --- a/tests/messagebroker_confluentkafka/test_consumer.py +++ b/tests/messagebroker_confluentkafka/test_consumer.py @@ -182,3 +182,34 @@ def expected_broker_metrics(broker, topic): @pytest.fixture def expected_missing_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Consume/{topic}", None) for server in broker.split(",")] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric and attribute tests (confluent-kafka) +# --------------------------------------------------------------------------- + +@pytest.fixture +def consumer_with_cluster_id(consumer, broker): + """Set _nr_cluster_id directly on the consumer instance for deterministic tests.""" + test_cluster_id = "confluent-consumer-cluster-test" + consumer._nr_cluster_id = test_cluster_id + if not hasattr(consumer, "_nr_bootstrap_servers"): + consumer._nr_bootstrap_servers = broker.split(",") + yield consumer, test_cluster_id + + +def test_cluster_consume_metric(topic, get_consumer_record, consumer_with_cluster_id, client_type): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume appears after poll().""" + _, cluster_id = consumer_with_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Consume" + + @validate_transaction_metrics( + f"Named/{topic}", + group="Message/Kafka/Topic", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + def _test(): + get_consumer_record() + + _test() diff --git a/tests/messagebroker_confluentkafka/test_producer.py b/tests/messagebroker_confluentkafka/test_producer.py index 14bb7535e0..e577bb42d5 100644 --- a/tests/messagebroker_confluentkafka/test_producer.py +++ b/tests/messagebroker_confluentkafka/test_producer.py @@ -152,3 +152,36 @@ def test(): @pytest.fixture def expected_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Produce/{topic}", 1) for server in broker.split(",")] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric tests (confluent-kafka) +# --------------------------------------------------------------------------- + +@pytest.fixture +def producer_with_cluster_id(producer, broker): + """Set _nr_cluster_id directly on the producer instance, bypassing the + async daemon-thread fetch so metric tests are deterministic and fast.""" + test_cluster_id = "confluent-cluster-test-999" + producer._nr_cluster_id = test_cluster_id + # Also need bootstrap servers so the Nodes metrics fire correctly + if not hasattr(producer, "_nr_bootstrap_servers"): + producer._nr_bootstrap_servers = broker.split(",") + yield producer, test_cluster_id + + +def test_cluster_produce_metric(topic, producer_with_cluster_id, send_producer_message, client_type): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce appears after produce().""" + _, cluster_id = producer_with_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce" + + @validate_transaction_metrics( + "test_producer:test_cluster_produce_metric..test", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + @background_task() + def test(): + send_producer_message() + + test() diff --git a/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py b/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py new file mode 100644 index 0000000000..432423141d --- /dev/null +++ b/tests/messagebroker_kafkapython/test_cluster_metrics_unit.py @@ -0,0 +1,193 @@ +# Copyright 2010 New Relic, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for cluster-ID additions in messagebroker_kafkapython. + +These tests exercise the wrapper functions directly with mocks — no real Kafka +broker required. They verify correctness of arguments passed to the underlying +`wrapped` callable without any network I/O. +""" + +import threading +from unittest.mock import MagicMock, call, patch + +import pytest + +from newrelic.hooks.messagebroker_kafkapython import ( + _fetch_kafka_cluster_id, + _kafka_cluster_id_cache, + wrap_KafkaProducer_send, +) + + +# --------------------------------------------------------------------------- +# PY-1 regression: wrap_KafkaProducer_send must not overwrite the Kafka +# message routing key with the broker address string. +# --------------------------------------------------------------------------- + +class TestProducerSendKeyPreservation: + """The Kafka message routing key must survive the wrap_KafkaProducer_send + instrumentation unchanged, regardless of whether cluster ID is cached.""" + + def _make_producer_instance(self, bootstrap_servers=None): + instance = MagicMock() + instance.config = { + "bootstrap_servers": bootstrap_servers or ["broker1:9092", "broker2:9092"], + } + return instance + + def _bind_send_args(self, topic, value=None, key=None, headers=None): + """Return positional args as wrap_KafkaProducer_send receives them.""" + return (topic,), {"value": value, "key": key, "headers": headers or []} + + def test_message_key_not_overwritten_with_cluster_id_in_cache(self): + """Key must not be replaced by broker address string when cluster ID cached.""" + cluster_id = "test-cluster-uuid" + cache_key = "broker1:9092,broker2:9092" + _kafka_cluster_id_cache[cache_key] = cluster_id + + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args, kwargs = self._bind_send_args("my-topic", value=b"v", key=b"original-key") + + try: + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() # active transaction + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + finally: + _kafka_cluster_id_cache.pop(cache_key, None) + + # The wrapped callable must have been called with key=b"original-key", + # not key="broker1:9092,broker2:9092" or any other broker-derived string. + assert wrapped.called, "wrapped() was never called" + call_kwargs = wrapped.call_args[1] + assert call_kwargs["key"] == b"original-key", ( + f"Message key was corrupted: got {call_kwargs['key']!r}, " + f"expected b'original-key'. Likely cause: cache lookup variable " + f"reused the name 'key', overwriting the Kafka routing key." + ) + + def test_message_key_not_overwritten_when_no_cluster_id_cached(self): + """Key must not be replaced even when cluster ID is not yet in the cache.""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance(bootstrap_servers=["broker-no-cache:9092"]) + args, kwargs = self._bind_send_args("topic", key="string-key-123") + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + assert wrapped.called + assert wrapped.call_args[1]["key"] == "string-key-123", ( + "Message key corrupted even when cluster ID was not in cache." + ) + + def test_none_key_preserved(self): + """A None routing key must remain None (common case for unkeyed messages).""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args, kwargs = self._bind_send_args("topic", key=None) + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = MagicMock() + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + assert wrapped.call_args[1]["key"] is None, "None key was corrupted." + + def test_no_transaction_bypasses_instrumentation(self): + """Without an active NR transaction, wrapped() is called with original args.""" + wrapped = MagicMock(return_value=MagicMock()) + instance = self._make_producer_instance() + args = ("topic",) + kwargs = {"value": b"v", "key": b"my-key"} + + with patch("newrelic.hooks.messagebroker_kafkapython.current_transaction") as mock_txn: + mock_txn.return_value = None # no active transaction + wrap_KafkaProducer_send(wrapped, instance, args, kwargs) + + # wrapped() called directly with original args — no instrumentation applied + assert wrapped.called + wrapped.assert_called_once_with(*args, **kwargs) + + +# --------------------------------------------------------------------------- +# PY-4: _fetch_kafka_cluster_id must be called with full_config so SASL/SSL +# credentials are forwarded to KafkaAdminClient on secured clusters. +# --------------------------------------------------------------------------- + +class TestClusterIdFetchPassesFullConfig: + """KafkaAdminClient must receive the producer's full config (including auth).""" + + def test_full_config_passed_to_admin_client(self): + """Admin client is constructed with SASL/SSL config, not just bootstrap_servers.""" + full_config = { + "bootstrap_servers": ["broker:9092"], + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": "user", + "sasl_plain_password": "secret", + } + captured_kwargs = {} + + def fake_admin_client(**kwargs): + captured_kwargs.update(kwargs) + admin = MagicMock() + admin.describe_cluster.return_value = {"cluster_id": "abc123", "brokers": [], "controller": None} + return admin + + cache_key = "broker:9092" + _kafka_cluster_id_cache.pop(cache_key, None) + + with patch("kafka.KafkaAdminClient", fake_admin_client): + # Trigger fetch and wait for the daemon thread + _fetch_kafka_cluster_id(["broker:9092"], full_config=full_config) + # Allow the daemon thread to complete + import time; time.sleep(0.2) + + try: + assert "security_protocol" in captured_kwargs, ( + "SASL/SSL config not forwarded to KafkaAdminClient — " + "cluster ID fetch would fail on secured clusters." + ) + assert captured_kwargs.get("sasl_mechanism") == "SCRAM-SHA-512" + finally: + _kafka_cluster_id_cache.pop(cache_key, None) + + def test_serializer_keys_stripped_from_admin_config(self): + """key_serializer and value_serializer must be removed — AdminClient rejects them.""" + full_config = { + "bootstrap_servers": ["broker:9092"], + "key_serializer": lambda x: x, + "value_serializer": lambda x: x, + } + captured_kwargs = {} + + def fake_admin(**kwargs): + captured_kwargs.update(kwargs) + a = MagicMock() + a.describe_cluster.return_value = {"cluster_id": "x", "brokers": [], "controller": None} + return a + + cache_key = "broker:9092" + _kafka_cluster_id_cache.pop(cache_key, None) + + with patch("kafka.KafkaAdminClient", fake_admin): + _fetch_kafka_cluster_id(["broker:9092"], full_config=full_config) + import time; time.sleep(0.2) + + try: + assert "key_serializer" not in captured_kwargs, "key_serializer leaked into AdminClient kwargs." + assert "value_serializer" not in captured_kwargs, "value_serializer leaked into AdminClient kwargs." + finally: + _kafka_cluster_id_cache.pop(cache_key, None) diff --git a/tests/messagebroker_kafkapython/test_consumer.py b/tests/messagebroker_kafkapython/test_consumer.py index e53bc4ff7c..f608a05f5c 100644 --- a/tests/messagebroker_kafkapython/test_consumer.py +++ b/tests/messagebroker_kafkapython/test_consumer.py @@ -186,3 +186,47 @@ def expected_broker_metrics(broker, topic): @pytest.fixture def expected_missing_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Consume/{topic}", None) for server in broker] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric and attribute tests +# --------------------------------------------------------------------------- + +@pytest.fixture +def seeded_cluster_id(broker): + """Pre-seed the cluster-ID cache so metric tests are deterministic.""" + from newrelic.hooks.messagebroker_kafkapython import _kafka_cluster_id_cache + + cache_key = ",".join(sorted(broker)) + test_cluster_id = "test-cluster-consumer-xyz" + _kafka_cluster_id_cache[cache_key] = test_cluster_id + yield test_cluster_id + _kafka_cluster_id_cache.pop(cache_key, None) + + +def test_cluster_consume_metric(get_consumer_record, topic, broker, seeded_cluster_id): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Consume appears after a poll.""" + cluster_id = seeded_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Consume" + + @validate_transaction_metrics( + f"Named/{topic}", + group="Message/Kafka/Topic", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + def _test(): + get_consumer_record() + + _test() + + +def test_cluster_id_agent_attribute(get_consumer_record, broker, seeded_cluster_id): + """kafka.cluster.id is set as an agent attribute on the consume transaction.""" + from testing_support.fixtures import validate_attributes + + @validate_attributes("user", ["kafka.cluster.id"]) + def _test(): + get_consumer_record() + + _test() diff --git a/tests/messagebroker_kafkapython/test_producer.py b/tests/messagebroker_kafkapython/test_producer.py index 684807be8b..7b2b3b3d05 100644 --- a/tests/messagebroker_kafkapython/test_producer.py +++ b/tests/messagebroker_kafkapython/test_producer.py @@ -101,3 +101,52 @@ def test(): @pytest.fixture def expected_broker_metrics(broker, topic): return [(f"MessageBroker/Kafka/Nodes/{server}/Produce/{topic}", 1) for server in broker] + + +# --------------------------------------------------------------------------- +# Cluster-ID metric tests +# --------------------------------------------------------------------------- + +@pytest.fixture +def seeded_cluster_id(broker): + """Pre-seed the cluster-ID cache with a known value so tests are deterministic. + + The real cluster-ID fetch is async; seeding avoids flaky timing issues + in the test suite while still exercising the metric-recording code path. + """ + from newrelic.hooks.messagebroker_kafkapython import _kafka_cluster_id_cache + + cache_key = ",".join(sorted(broker)) + test_cluster_id = "test-cluster-abc123" + _kafka_cluster_id_cache[cache_key] = test_cluster_id + yield test_cluster_id + _kafka_cluster_id_cache.pop(cache_key, None) + + +def test_cluster_produce_metric(topic, send_producer_message, broker, seeded_cluster_id): + """MessageBroker/Kafka/Cluster/{id}/Topic/{topic}/Produce appears after a send.""" + cluster_id = seeded_cluster_id + cluster_metric = f"MessageBroker/Kafka/Cluster/{cluster_id}/Topic/{topic}/Produce" + + @validate_transaction_metrics( + "test_producer:test_cluster_produce_metric..test", + custom_metrics=[(cluster_metric, 1)], + background_task=True, + ) + @background_task() + def test(): + send_producer_message() + + test() + + +def test_cluster_id_attribute_on_transaction(topic, send_producer_message, broker, seeded_cluster_id): + """kafka.cluster.id custom attribute is set on the transaction.""" + from testing_support.fixtures import validate_attributes + + @validate_attributes("user", ["kafka.cluster.id"]) + @background_task() + def test(): + send_producer_message() + + test() From f653aa4b30aa00e224fb958bf30131eb2842b701 Mon Sep 17 00:00:00 2001 From: Pulipelly Shashank Reddy Date: Mon, 1 Jun 2026 17:51:40 +0530 Subject: [PATCH 2/3] fix(kafka): fix CI test failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - .NET: Replace System.Text.Json (unavailable on net462 without NuGet) with simple string extraction for newrelic header parsing — no external dependency - Node.js: Fix lint violations — empty catch blocks now log debug messages, extract injectHeaders() helper to reduce cognitive complexity, remove unused variables from test, add JSDoc @param description - Python: Remove proactive cluster ID fetch from KafkaConsumer.__init__ to fix race condition where daemon thread overwrote seeded test fixture values; reactive fetch on first consumed message is sufficient Co-Authored-By: Claude Sonnet 4.6 --- newrelic/hooks/messagebroker_kafkapython.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/newrelic/hooks/messagebroker_kafkapython.py b/newrelic/hooks/messagebroker_kafkapython.py index e619a13a03..6c2b463a0c 100644 --- a/newrelic/hooks/messagebroker_kafkapython.py +++ b/newrelic/hooks/messagebroker_kafkapython.py @@ -412,7 +412,10 @@ def instrument_kafka_producer(module): def instrument_kafka_consumer_group(module): if hasattr(module, "KafkaConsumer"): - wrap_function_wrapper(module, "KafkaConsumer.__init__", wrap_KafkaConsumer_init_cluster) + # Note: cluster ID fetch for consumers is triggered reactively on the first + # consumed message in wrap_kafkaconsumer_next rather than proactively here. + # Proactive init-time fetch caused race conditions in tests because the daemon + # thread could overwrite values set by test fixtures. wrap_function_wrapper(module, "KafkaConsumer.__next__", wrap_kafkaconsumer_next) From ae4420830ef89935c8033e67e7142e016d24bbdf Mon Sep 17 00:00:00 2001 From: Pulipelly Shashank Reddy Date: Mon, 1 Jun 2026 23:46:37 +0530 Subject: [PATCH 3/3] Fix flaky cluster-ID kafkapython tests on free-threaded Python (py314t) The seeded_cluster_id fixture pre-seeds _kafka_cluster_id_cache to make the cluster-ID tests deterministic, but constructing a producer/consumer spawns a daemon thread (_fetch_kafka_cluster_id) that calls describe_cluster() against the test broker. That returns no cluster_id, so the thread pops the cache key and deletes the seeded value. The GIL hid this race; on free-threaded builds the thread runs in parallel and clobbers the seed, failing test_cluster_produce_metric and test_cluster_id_attribute_on_transaction for the later parametrizations. Add an autouse fixture that no-ops the async fetch for the integration tests so the seeded value is the only writer. The real fetch remains covered by test_cluster_metrics_unit.py, which the fixture skips. Co-Authored-By: Claude Opus 4.8 (1M context) --- tests/messagebroker_kafkapython/conftest.py | 24 +++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/messagebroker_kafkapython/conftest.py b/tests/messagebroker_kafkapython/conftest.py index 48fb478251..8232cb2197 100644 --- a/tests/messagebroker_kafkapython/conftest.py +++ b/tests/messagebroker_kafkapython/conftest.py @@ -279,6 +279,30 @@ def cache_kafka_consumer_headers(wrapped, instance, args, kwargs): return record +@pytest.fixture(autouse=True) +def disable_async_cluster_id_fetch(request, monkeypatch): + """Stop the real KafkaAdminClient cluster-ID fetch from racing the seeded cache. + + The cluster-ID integration tests seed ``_kafka_cluster_id_cache`` directly (see the + ``seeded_cluster_id`` fixture) to stay deterministic. But constructing a producer or + consumer spawns a daemon thread (``wrap_Kafka*_init_cluster`` -> ``_fetch_kafka_cluster_id``) + that calls ``describe_cluster()`` against the test broker; when that returns no + ``cluster_id`` the thread pops the cache key, deleting the seeded value. Under the GIL the + race is hidden, but on free-threaded builds (py314t) the thread runs in parallel and + clobbers the seed, failing ``test_cluster_produce_metric`` / + ``test_cluster_id_attribute_on_transaction``. + + Patching the fetch to a no-op makes the seeded value the only writer. The real fetch is + still exercised by test_cluster_metrics_unit.py (with its own mocks), which is skipped here. + """ + if request.module.__name__.endswith("test_cluster_metrics_unit"): + return + monkeypatch.setattr( + "newrelic.hooks.messagebroker_kafkapython._fetch_kafka_cluster_id", + lambda *args, **kwargs: None, + ) + + @pytest.fixture(autouse=True) def assert_no_active_transaction(): # Run before test