From 2f4dd5fba1ce4ceae76d72b662593fb27c5097f1 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Sun, 6 Apr 2025 15:31:06 +0200 Subject: [PATCH 1/9] [part 8] refactor!(telemetry): rework metrics API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Redesigned the metrics API to eliminate the need for pre-registering a list of metrics. Previously, the API relied on a pull-based approach where metrics had to be registered during the construction of the telemetry module. The module would then periodically retrieve values from the registered metrics. While functional, this design was cumbersome—especially for metrics involving dynamic tags. This refactoring introduces a push-based mechanism. Calls to `rate`, `counter`, or `distribution` now directly send data to the telemetry module, removing the need for upfront registration. This approach simplifies usage and improves flexibility. --- BUILD.bazel | 6 +- CMakeLists.txt | 3 +- include/datadog/telemetry/metrics.h | 156 ++------ include/datadog/telemetry/telemetry.h | 90 ++++- src/datadog/common/hash.cpp | 317 ++++++++++++++++ src/datadog/common/hash.h | 448 +++++++++++++++++++++++ src/datadog/datadog_agent.cpp | 21 +- src/datadog/telemetry/metric_context.h | 67 ++++ src/datadog/telemetry/metrics.cpp | 43 --- src/datadog/telemetry/telemetry.cpp | 127 ++++++- src/datadog/telemetry/telemetry_impl.cpp | 354 +++++++++--------- src/datadog/telemetry/telemetry_impl.h | 65 +++- src/datadog/telemetry_metrics.cpp | 27 ++ src/datadog/telemetry_metrics.h | 53 +++ src/datadog/trace_segment.cpp | 10 +- src/datadog/tracer.cpp | 16 +- test/CMakeLists.txt | 1 - test/telemetry/test_metrics.cpp | 40 -- test/telemetry/test_telemetry.cpp | 414 +++++++++++++++------ 19 files changed, 1682 insertions(+), 576 deletions(-) create mode 100644 src/datadog/common/hash.cpp create mode 100644 src/datadog/common/hash.h create mode 100644 src/datadog/telemetry/metric_context.h delete mode 100644 src/datadog/telemetry/metrics.cpp create mode 100644 src/datadog/telemetry_metrics.cpp create mode 100644 src/datadog/telemetry_metrics.h delete mode 100644 test/telemetry/test_metrics.cpp diff --git a/BUILD.bazel b/BUILD.bazel index eb59ad4f..a95c9d9d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1,12 +1,13 @@ cc_library( name = "dd_trace_cpp", srcs = [ + "src/datadog/common/hash.cpp", "src/datadog/telemetry/configuration.cpp", - "src/datadog/telemetry/metrics.cpp", "src/datadog/telemetry/log.h", "src/datadog/telemetry/telemetry.cpp", "src/datadog/telemetry/telemetry_impl.h", "src/datadog/telemetry/telemetry_impl.cpp", + "src/datadog/telemetry/metric_context.h", "src/datadog/baggage.cpp", "src/datadog/base64.cpp", "src/datadog/cerr_logger.cpp", @@ -41,6 +42,7 @@ cc_library( "src/datadog/string_util.cpp", "src/datadog/tag_propagation.cpp", "src/datadog/tags.cpp", + "src/datadog/telemetry_metrics.cpp", "src/datadog/threaded_event_scheduler.cpp", "src/datadog/tracer_config.cpp", "src/datadog/tracer.cpp", @@ -50,6 +52,7 @@ cc_library( "src/datadog/trace_segment.cpp", "src/datadog/version.cpp", "src/datadog/w3c_propagation.cpp", + "src/datadog/common/hash.h", "src/datadog/base64.h", "src/datadog/config_manager.h", "src/datadog/collector_response.h", @@ -74,6 +77,7 @@ cc_library( "src/datadog/string_util.h", "src/datadog/tag_propagation.h", "src/datadog/tags.h", + "src/datadog/telemetry_metrics.h", "src/datadog/threaded_event_scheduler.h", "src/datadog/trace_sampler.h", "src/datadog/w3c_propagation.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index e807b0a0..ecbf5612 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,8 +106,8 @@ target_sources(dd_trace_cpp-objects BASE_DIRS include FILES ${public_header_files} PRIVATE + src/datadog/common/hash.cpp src/datadog/telemetry/configuration.cpp - src/datadog/telemetry/metrics.cpp src/datadog/telemetry/telemetry.cpp src/datadog/telemetry/telemetry_impl.cpp src/datadog/baggage.cpp @@ -150,6 +150,7 @@ target_sources(dd_trace_cpp-objects src/datadog/trace_sampler_config.cpp src/datadog/trace_sampler.cpp src/datadog/trace_segment.cpp + src/datadog/telemetry_metrics.cpp src/datadog/version.cpp src/datadog/w3c_propagation.cpp ) diff --git a/include/datadog/telemetry/metrics.h b/include/datadog/telemetry/metrics.h index 45beb464..3cca54e3 100644 --- a/include/datadog/telemetry/metrics.h +++ b/include/datadog/telemetry/metrics.h @@ -1,146 +1,34 @@ #pragma once -// This component provides an interface, `Metric`, and specific classes for -// Counter and Gauge metrics. A metric has a name, type, and set of key:value -// tags associated with it. Metrics can be general to APM or language-specific. -// General metrics have `common` set to `true`, and language-specific metrics -// have `common` set to `false`. - -#include #include -#include namespace datadog { namespace telemetry { -class Metric { - // The name of the metric that will be published. A transformation occurs - // based on the name and whether it is "common" or "language-specific" when it - // is recorded. - std::string name_; - // The type of the metric. This will currently be count or gauge. - std::string type_; - // Namespace of the metric. - std::string scope_; - // Tags associated with this specific instance of the metric. - std::vector tags_; - // This affects the transformation of the metric name, where it can be a - // common telemetry metric, or a language-specific metric that is prefixed - // with the language name. - bool common_; - - protected: - std::atomic value_ = 0; - Metric(std::string name, std::string type, std::string scope, - std::vector tags, bool common); - - Metric(Metric&& rhs) - : name_(std::move(rhs.name_)), - type_(std::move(rhs.type_)), - scope_(std::move(rhs.scope_)), - tags_(std::move(rhs.tags_)) { - rhs.value_.store(value_.exchange(rhs.value_)); - } - - Metric& operator=(Metric&& rhs) { - if (&rhs != this) { - std::swap(name_, rhs.name_); - std::swap(type_, rhs.type_); - std::swap(scope_, rhs.scope_); - std::swap(tags_, rhs.tags_); - rhs.value_.store(value_.exchange(rhs.value_)); - } - return *this; - } - - public: - // Accessors for name, type, tags, common and capture_and_reset_value are used - // when producing the JSON message for reporting metrics. - std::string name(); - std::string type(); - std::string scope(); - std::vector tags(); - bool common(); - uint64_t value(); - uint64_t capture_and_reset_value(); -}; - -// A count metric is used for measuring activity, and has methods for adding a -// number of actions, or incrementing the current number of actions by 1. -class CounterMetric : public Metric { - public: - CounterMetric(std::string name, std::string scope, - std::vector tags, bool common); - void inc(); - void add(uint64_t amount); +namespace details { +enum class MetricType : char { counter, rate, distribution }; +} + +/// TODO: pre-compute hash? +template +struct Metric final { + /// The type of the metric. + static constexpr details::MetricType type = T; + /// The name of the metric that will be published. A transformation occurs + /// based on the name and whether it is "common" or "language-specific" when + /// it is recorded. + std::string name; + /// Namespace of the metric. + std::string scope; + /// This affects the transformation of the metric name, where it can be a + /// common telemetry metric, or a language-specific metric that is prefixed + /// with the language name. + bool common; }; -// A gauge metric is used for measuring state, and mas methods to set the -// current state, add or subtract from it, or increment/decrement the current -// state by 1. -class GaugeMetric : public Metric { - public: - GaugeMetric(std::string name, std::string scope, - std::vector tags, bool common); - void set(uint64_t value); - void inc(); - void add(uint64_t amount); - void dec(); - void sub(uint64_t amount); -}; - -// This structure contains all the metrics that are exposed by tracer -// telemetry. -struct DefaultMetrics { - struct { - telemetry::CounterMetric spans_created = { - "spans_created", "tracers", {}, true}; - telemetry::CounterMetric spans_finished = { - "spans_finished", "tracers", {}, true}; - - telemetry::CounterMetric trace_segments_created_new = { - "trace_segments_created", "tracers", {"new_continued:new"}, true}; - telemetry::CounterMetric trace_segments_created_continued = { - "trace_segments_created", "tracers", {"new_continued:continued"}, true}; - telemetry::CounterMetric trace_segments_closed = { - "trace_segments_closed", "tracers", {}, true}; - telemetry::CounterMetric baggage_items_exceeded = { - "context_header.truncated", - "tracers", - {{"truncation_reason:baggage_item_count_exceeded"}}, - true, - }; - telemetry::CounterMetric baggage_bytes_exceeded = { - "context_header.truncated", - "tracers", - {{"truncation_reason:baggage_byte_count_exceeded"}}, - true, - }; - } tracer; - struct { - telemetry::CounterMetric requests = { - "trace_api.requests", "tracers", {}, true}; - - telemetry::CounterMetric responses_1xx = { - "trace_api.responses", "tracers", {"status_code:1xx"}, true}; - telemetry::CounterMetric responses_2xx = { - "trace_api.responses", "tracers", {"status_code:2xx"}, true}; - telemetry::CounterMetric responses_3xx = { - "trace_api.responses", "tracers", {"status_code:3xx"}, true}; - telemetry::CounterMetric responses_4xx = { - "trace_api.responses", "tracers", {"status_code:4xx"}, true}; - telemetry::CounterMetric responses_5xx = { - "trace_api.responses", "tracers", {"status_code:5xx"}, true}; - - telemetry::CounterMetric errors_timeout = { - "trace_api.errors", "tracers", {"type:timeout"}, true}; - telemetry::CounterMetric errors_network = { - "trace_api.errors", "tracers", {"type:network"}, true}; - telemetry::CounterMetric errors_status_code = { - "trace_api.errors", "tracers", {"type:status_code"}, true}; - - } trace_api; -}; +using Counter = Metric; +using Rate = Metric; +using Distribution = Metric; } // namespace telemetry } // namespace datadog diff --git a/include/datadog/telemetry/telemetry.h b/include/datadog/telemetry/telemetry.h index b30f696f..9df22ff7 100644 --- a/include/datadog/telemetry/telemetry.h +++ b/include/datadog/telemetry/telemetry.h @@ -31,7 +31,6 @@ namespace datadog::telemetry { void init(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); @@ -58,10 +57,6 @@ void send_configuration_change(); void capture_configuration_change( const std::vector& new_configuration); -/// Provides access to the telemetry metrics for updating the values. -/// This value should not be stored. -DefaultMetrics& metrics(); - /// Report internal warning message to Datadog. /// /// @param message The warning message to log. @@ -78,4 +73,89 @@ void report_error_log(std::string message); /// @param stacktrace Stacktrace leading to the error. void report_error_log(std::string message, std::string stacktrace); +/// The `counter` namespace provides functions to track values. +/// Counters can be useful for tracking the total number of an event occuring in +/// one time interval. For example, the amount of requests, errors or jobs +/// processed every 10 seconds. +namespace counter { + +/// Increments the specified counter by 1. +/// +/// @param `counter` the counter to increment. +void increment(const Counter& counter); + +/// Increments the specified counter by 1. +/// +/// @param `counter` the counter to increment. +/// @param `tags` the distribution tags. +void increment(const Counter& counter, const std::vector& tags); + +/// Decrements the specified counter by 1. +/// +/// @param `counter` the counter to decrement. +void decrement(const Counter& counter); + +/// Decrements the specified counter by 1. +/// +/// @param `counter` the counter to decrement. +/// @param `tags` the distribution tags. +void decrement(const Counter& counter, const std::vector& tags); + +/// Sets the counter to a specific value. +/// +/// @param `counter` the counter to update. +/// @param `value` the value to assign to the counter. +void set(const Counter& counter, uint64_t value); + +/// Sets the counter to a specific value. +/// +/// @param `counter` the counter to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to assign to the counter. +void set(const Counter& counter, const std::vector& tags, + uint64_t value); + +} // namespace counter + +/// The `rate` namespace provides support for rate metrics-values. +/// Rates can be useful for tracking the total number of an event occurences in +/// one time interval. For example, the number of requests per second. +namespace rate { + +/// Sets the rate to a specific value. +/// +/// @param `rate` the rate to update. +/// @param `value` the value to assign to the counter. +void set(const Rate& rate, uint64_t value); + +/// Sets the rate to a specific value. +/// +/// @param `rate` the rate to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to assign to the counter. +void set(const Rate& rate, const std::vector&, uint64_t value); + +} // namespace rate + +/// The `distribution` namespace provides support for statistical distribution. +/// Distribution can be useful for tracking things like response times or +/// payload sizes. +namespace distribution { + +/// Adds a value to the distribution. +/// +/// @param `distribution` the distribution to update. +/// @param `value` the value to add to the distribution. +void add(const Distribution& distribution, uint64_t value); + +/// Adds a value to the distribution. +/// +/// @param `distribution` the distribution to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to add to the distribution. +void add(const Distribution& distribution, const std::vector& tags, + uint64_t value); + +} // namespace distribution + } // namespace datadog::telemetry diff --git a/src/datadog/common/hash.cpp b/src/datadog/common/hash.cpp new file mode 100644 index 00000000..dcf5736f --- /dev/null +++ b/src/datadog/common/hash.cpp @@ -0,0 +1,317 @@ +#include "hash.h" + +#include + +#if _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4267) +#endif + +constexpr bool ALLOW_UNALIGNED_READS = true; + +namespace datadog::common { +namespace details { + +void SpookyHash::Short(const void *message, size_t length, uint64 *hash1, + uint64 *hash2) { + uint64 buf[2 * sc_numVars]; + union { + const uint8 *p8; + uint32 *p32; + uint64 *p64; + size_t i; + } u; + + u.p8 = (const uint8 *)message; + + if constexpr (!ALLOW_UNALIGNED_READS && (u.i & 0x7)) { + memcpy(buf, message, length); + u.p64 = buf; + } + + size_t remainder = length % 32; + uint64 a = *hash1; + uint64 b = *hash2; + uint64 c = sc_const; + uint64 d = sc_const; + + if (length > 15) { + const uint64 *end = u.p64 + (length / 32) * 4; + + // handle all complete sets of 32 bytes + for (; u.p64 < end; u.p64 += 4) { + c += u.p64[0]; + d += u.p64[1]; + ShortMix(a, b, c, d); + a += u.p64[2]; + b += u.p64[3]; + } + + // Handle the case of 16+ remaining bytes. + if (remainder >= 16) { + c += u.p64[0]; + d += u.p64[1]; + ShortMix(a, b, c, d); + u.p64 += 2; + remainder -= 16; + } + } + + // Handle the last 0..15 bytes, and its length + d += ((uint64)length) << 56; + switch (remainder) { + case 15: + d += ((uint64)u.p8[14]) << 48; + [[fallthrough]]; + case 14: + d += ((uint64)u.p8[13]) << 40; + [[fallthrough]]; + case 13: + d += ((uint64)u.p8[12]) << 32; + [[fallthrough]]; + case 12: + d += u.p32[2]; + c += u.p64[0]; + break; + case 11: + d += ((uint64)u.p8[10]) << 16; + [[fallthrough]]; + case 10: + d += ((uint64)u.p8[9]) << 8; + [[fallthrough]]; + case 9: + d += (uint64)u.p8[8]; + [[fallthrough]]; + case 8: + c += u.p64[0]; + break; + case 7: + c += ((uint64)u.p8[6]) << 48; + [[fallthrough]]; + case 6: + c += ((uint64)u.p8[5]) << 40; + [[fallthrough]]; + case 5: + c += ((uint64)u.p8[4]) << 32; + [[fallthrough]]; + case 4: + c += u.p32[0]; + break; + case 3: + c += ((uint64)u.p8[2]) << 16; + [[fallthrough]]; + case 2: + c += ((uint64)u.p8[1]) << 8; + [[fallthrough]]; + case 1: + c += (uint64)u.p8[0]; + break; + case 0: + c += sc_const; + d += sc_const; + } + ShortEnd(a, b, c, d); + *hash1 = a; + *hash2 = b; +} + +// do the whole hash in one call +void SpookyHash::Hash128(const void *message, size_t length, uint64 *hash1, + uint64 *hash2) { + if (length < sc_bufSize) { + Short(message, length, hash1, hash2); + return; + } + + uint64 h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11; + uint64 buf[sc_numVars]; + uint64 *end; + union { + const uint8 *p8; + uint64 *p64; + size_t i; + } u; + size_t remainder; + + h0 = h3 = h6 = h9 = *hash1; + h1 = h4 = h7 = h10 = *hash2; + h2 = h5 = h8 = h11 = sc_const; + + u.p8 = (const uint8 *)message; + end = u.p64 + (length / sc_blockSize) * sc_numVars; + + // handle all whole sc_blockSize blocks of bytes + if constexpr (ALLOW_UNALIGNED_READS || ((u.i & 0x7) == 0)) { + while (u.p64 < end) { + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } else { + while (u.p64 < end) { + memcpy(buf, u.p64, sc_blockSize); + Mix(buf, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } + + // handle the last partial block of sc_blockSize bytes + remainder = (length - ((const uint8 *)end - (const uint8 *)message)); + memcpy(buf, end, remainder); + memset(((uint8 *)buf) + remainder, 0, sc_blockSize - remainder); + ((uint8 *)buf)[sc_blockSize - 1] = remainder; + + // do some final mixing + End(buf, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + *hash1 = h0; + *hash2 = h1; +} + +// init spooky state +void SpookyHash::Init(uint64 seed1, uint64 seed2) { + m_length = 0; + m_remainder = 0; + m_state[0] = seed1; + m_state[1] = seed2; +} + +// add a message fragment to the state +void SpookyHash::Update(const void *message, size_t length) { + uint64 h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11; + size_t newLength = length + m_remainder; + uint8 remainder; + union { + const uint8 *p8; + uint64 *p64; + size_t i; + } u; + const uint64 *end; + + // Is this message fragment too short? If it is, stuff it away. + if (newLength < sc_bufSize) { + memcpy(&((uint8 *)m_data)[m_remainder], message, length); + m_length = length + m_length; + m_remainder = (uint8)newLength; + return; + } + + // init the variables + if (m_length < sc_bufSize) { + h0 = h3 = h6 = h9 = m_state[0]; + h1 = h4 = h7 = h10 = m_state[1]; + h2 = h5 = h8 = h11 = sc_const; + } else { + h0 = m_state[0]; + h1 = m_state[1]; + h2 = m_state[2]; + h3 = m_state[3]; + h4 = m_state[4]; + h5 = m_state[5]; + h6 = m_state[6]; + h7 = m_state[7]; + h8 = m_state[8]; + h9 = m_state[9]; + h10 = m_state[10]; + h11 = m_state[11]; + } + m_length = length + m_length; + + // if we've got anything stuffed away, use it now + if (m_remainder) { + uint8 prefix = sc_bufSize - m_remainder; + memcpy(&(((uint8 *)m_data)[m_remainder]), message, prefix); + u.p64 = m_data; + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + Mix(&u.p64[sc_numVars], h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p8 = ((const uint8 *)message) + prefix; + length -= prefix; + } else { + u.p8 = (const uint8 *)message; + } + + // handle all whole blocks of sc_blockSize bytes + end = u.p64 + (length / sc_blockSize) * sc_numVars; + remainder = (uint8)(length - ((const uint8 *)end - u.p8)); + if constexpr (ALLOW_UNALIGNED_READS || (u.i & 0x7) == 0) { + while (u.p64 < end) { + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } else { + while (u.p64 < end) { + memcpy(m_data, u.p8, sc_blockSize); + Mix(m_data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } + + // stuff away the last few bytes + m_remainder = remainder; + memcpy(m_data, end, remainder); + + // stuff away the variables + m_state[0] = h0; + m_state[1] = h1; + m_state[2] = h2; + m_state[3] = h3; + m_state[4] = h4; + m_state[5] = h5; + m_state[6] = h6; + m_state[7] = h7; + m_state[8] = h8; + m_state[9] = h9; + m_state[10] = h10; + m_state[11] = h11; +} + +// report the hash for the concatenation of all message fragments so far +void SpookyHash::Final(uint64 *hash1, uint64 *hash2) { + // init the variables + if (m_length < sc_bufSize) { + *hash1 = m_state[0]; + *hash2 = m_state[1]; + Short(m_data, m_length, hash1, hash2); + return; + } + + const uint64 *data = (const uint64 *)m_data; + uint8 remainder = m_remainder; + + uint64 h0 = m_state[0]; + uint64 h1 = m_state[1]; + uint64 h2 = m_state[2]; + uint64 h3 = m_state[3]; + uint64 h4 = m_state[4]; + uint64 h5 = m_state[5]; + uint64 h6 = m_state[6]; + uint64 h7 = m_state[7]; + uint64 h8 = m_state[8]; + uint64 h9 = m_state[9]; + uint64 h10 = m_state[10]; + uint64 h11 = m_state[11]; + + if (remainder >= sc_blockSize) { + // m_data can contain two blocks; handle any whole first block + Mix(data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + data += sc_numVars; + remainder -= sc_blockSize; + } + + // mix in the last partial block, and the length mod sc_blockSize + memset(&((uint8 *)data)[remainder], 0, (sc_blockSize - remainder)); + + ((uint8 *)data)[sc_blockSize - 1] = remainder; + + // do some final mixing + End(data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + + *hash1 = h0; + *hash2 = h1; +} + +} // namespace details + +} // namespace datadog::common + +#if _MSC_VER +#pragma warning(pop) +#endif diff --git a/src/datadog/common/hash.h b/src/datadog/common/hash.h new file mode 100644 index 00000000..3d2155d8 --- /dev/null +++ b/src/datadog/common/hash.h @@ -0,0 +1,448 @@ +#pragma once + +#include + +// clang-format off +#ifdef _MSC_VER +# define INLINE __forceinline + typedef unsigned __int64 uint64; + typedef unsigned __int32 uint32; + typedef unsigned __int16 uint16; + typedef unsigned __int8 uint8; +#else +# define INLINE inline +# include + typedef uint64_t uint64; + typedef uint32_t uint32; + typedef uint16_t uint16; + typedef uint8_t uint8; +#endif +// clang-format on + +namespace datadog::common { +namespace details { +// +// SpookyHash: a 128-bit noncryptographic hash function +// By Bob Jenkins, public domain +// Oct 31 2010: alpha, framework + SpookyHash::Mix appears right +// Oct 31 2011: alpha again, Mix only good to 2^^69 but rest appears right +// Dec 31 2011: beta, improved Mix, tested it for 2-bit deltas +// Feb 2 2012: production, same bits as beta +// Feb 5 2012: adjusted definitions of uint* to be more portable +// Mar 30 2012: 3 bytes/cycle, not 4. Alpha was 4 but wasn't thorough enough. +// August 5 2012: SpookyV2 (different results) +// +// Up to 3 bytes/cycle for long messages. Reasonably fast for short messages. +// All 1 or 2 bit deltas achieve avalanche within 1% bias per output bit. +// +// This was developed for and tested on 64-bit x86-compatible processors. +// It assumes the processor is little-endian. There is a macro +// controlling whether unaligned reads are allowed (by default they are). +// This should be an equally good hash on big-endian machines, but it will +// compute different results on them than on little-endian machines. +// +// Google's CityHash has similar specs to SpookyHash, and CityHash is faster +// on new Intel boxes. MD4 and MD5 also have similar specs, but they are orders +// of magnitude slower. CRCs are two or more times slower, but unlike +// SpookyHash, they have nice math for combining the CRCs of pieces to form +// the CRCs of wholes. There are also cryptographic hashes, but those are even +// slower than MD5. +// +// Source: +class SpookyHash { + public: + // + // SpookyHash: hash a single message in one call, produce 128-bit output + // + static void Hash128(const void *message, // message to hash + size_t length, // length of message in bytes + uint64 *hash1, // in/out: in seed 1, out hash value 1 + uint64 *hash2); // in/out: in seed 2, out hash value 2 + + // + // Hash64: hash a single message in one call, return 64-bit output + // + static uint64 Hash64(const void *message, // message to hash + size_t length, // length of message in bytes + uint64 seed) // seed + { + uint64 hash1 = seed; + Hash128(message, length, &hash1, &seed); + return hash1; + } + + // + // Hash32: hash a single message in one call, produce 32-bit output + // + static uint32 Hash32(const void *message, // message to hash + size_t length, // length of message in bytes + uint32 seed) // seed + { + uint64 hash1 = seed, hash2 = seed; + Hash128(message, length, &hash1, &hash2); + return (uint32)hash1; + } + + // + // Init: initialize the context of a SpookyHash + // + void Init(uint64 seed1, // any 64-bit value will do, including 0 + uint64 seed2); // different seeds produce independent hashes + + // + // Update: add a piece of a message to a SpookyHash state + // + void Update(const void *message, // message fragment + size_t length); // length of message fragment in bytes + + // + // Final: compute the hash for the current SpookyHash state + // + // This does not modify the state; you can keep updating it afterward + // + // The result is the same as if SpookyHash() had been called with + // all the pieces concatenated into one message. + // + void Final(uint64 *hash1, // out only: first 64 bits of hash value. + uint64 *hash2); // out only: second 64 bits of hash value. + + // + // left rotate a 64-bit value by k bytes + // + static INLINE uint64 Rot64(uint64 x, int k) { + return (x << k) | (x >> (64 - k)); + } + + // + // This is used if the input is 96 bytes long or longer. + // + // The internal state is fully overwritten every 96 bytes. + // Every input bit appears to cause at least 128 bits of entropy + // before 96 other bytes are combined, when run forward or backward + // For every input bit, + // Two inputs differing in just that input bit + // Where "differ" means xor or subtraction + // And the base value is random + // When run forward or backwards one Mix + // I tried 3 pairs of each; they all differed by at least 212 bits. + // + static INLINE void Mix(const uint64 *data, uint64 &s0, uint64 &s1, uint64 &s2, + uint64 &s3, uint64 &s4, uint64 &s5, uint64 &s6, + uint64 &s7, uint64 &s8, uint64 &s9, uint64 &s10, + uint64 &s11) { + s0 += data[0]; + s2 ^= s10; + s11 ^= s0; + s0 = Rot64(s0, 11); + s11 += s1; + s1 += data[1]; + s3 ^= s11; + s0 ^= s1; + s1 = Rot64(s1, 32); + s0 += s2; + s2 += data[2]; + s4 ^= s0; + s1 ^= s2; + s2 = Rot64(s2, 43); + s1 += s3; + s3 += data[3]; + s5 ^= s1; + s2 ^= s3; + s3 = Rot64(s3, 31); + s2 += s4; + s4 += data[4]; + s6 ^= s2; + s3 ^= s4; + s4 = Rot64(s4, 17); + s3 += s5; + s5 += data[5]; + s7 ^= s3; + s4 ^= s5; + s5 = Rot64(s5, 28); + s4 += s6; + s6 += data[6]; + s8 ^= s4; + s5 ^= s6; + s6 = Rot64(s6, 39); + s5 += s7; + s7 += data[7]; + s9 ^= s5; + s6 ^= s7; + s7 = Rot64(s7, 57); + s6 += s8; + s8 += data[8]; + s10 ^= s6; + s7 ^= s8; + s8 = Rot64(s8, 55); + s7 += s9; + s9 += data[9]; + s11 ^= s7; + s8 ^= s9; + s9 = Rot64(s9, 54); + s8 += s10; + s10 += data[10]; + s0 ^= s8; + s9 ^= s10; + s10 = Rot64(s10, 22); + s9 += s11; + s11 += data[11]; + s1 ^= s9; + s10 ^= s11; + s11 = Rot64(s11, 46); + s10 += s0; + } + + // + // Mix all 12 inputs together so that h0, h1 are a hash of them all. + // + // For two inputs differing in just the input bits + // Where "differ" means xor or subtraction + // And the base value is random, or a counting value starting at that bit + // The final result will have each bit of h0, h1 flip + // For every input bit, + // with probability 50 +- .3% + // For every pair of input bits, + // with probability 50 +- 3% + // + // This does not rely on the last Mix() call having already mixed some. + // Two iterations was almost good enough for a 64-bit result, but a + // 128-bit result is reported, so End() does three iterations. + // + static INLINE void EndPartial(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3, + uint64 &h4, uint64 &h5, uint64 &h6, uint64 &h7, + uint64 &h8, uint64 &h9, uint64 &h10, + uint64 &h11) { + h11 += h1; + h2 ^= h11; + h1 = Rot64(h1, 44); + h0 += h2; + h3 ^= h0; + h2 = Rot64(h2, 15); + h1 += h3; + h4 ^= h1; + h3 = Rot64(h3, 34); + h2 += h4; + h5 ^= h2; + h4 = Rot64(h4, 21); + h3 += h5; + h6 ^= h3; + h5 = Rot64(h5, 38); + h4 += h6; + h7 ^= h4; + h6 = Rot64(h6, 33); + h5 += h7; + h8 ^= h5; + h7 = Rot64(h7, 10); + h6 += h8; + h9 ^= h6; + h8 = Rot64(h8, 13); + h7 += h9; + h10 ^= h7; + h9 = Rot64(h9, 38); + h8 += h10; + h11 ^= h8; + h10 = Rot64(h10, 53); + h9 += h11; + h0 ^= h9; + h11 = Rot64(h11, 42); + h10 += h0; + h1 ^= h10; + h0 = Rot64(h0, 54); + } + + static INLINE void End(const uint64 *data, uint64 &h0, uint64 &h1, uint64 &h2, + uint64 &h3, uint64 &h4, uint64 &h5, uint64 &h6, + uint64 &h7, uint64 &h8, uint64 &h9, uint64 &h10, + uint64 &h11) { + h0 += data[0]; + h1 += data[1]; + h2 += data[2]; + h3 += data[3]; + h4 += data[4]; + h5 += data[5]; + h6 += data[6]; + h7 += data[7]; + h8 += data[8]; + h9 += data[9]; + h10 += data[10]; + h11 += data[11]; + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + } + + // + // The goal is for each bit of the input to expand into 128 bits of + // apparent entropy before it is fully overwritten. + // n trials both set and cleared at least m bits of h0 h1 h2 h3 + // n: 2 m: 29 + // n: 3 m: 46 + // n: 4 m: 57 + // n: 5 m: 107 + // n: 6 m: 146 + // n: 7 m: 152 + // when run forwards or backwards + // for all 1-bit and 2-bit diffs + // with diffs defined by either xor or subtraction + // with a base of all zeros plus a counter, or plus another bit, or random + // + static INLINE void ShortMix(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3) { + h2 = Rot64(h2, 50); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 52); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 30); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 41); + h1 += h2; + h3 ^= h1; + h2 = Rot64(h2, 54); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 48); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 38); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 37); + h1 += h2; + h3 ^= h1; + h2 = Rot64(h2, 62); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 34); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 5); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 36); + h1 += h2; + h3 ^= h1; + } + + // + // Mix all 4 inputs together so that h0, h1 are a hash of them all. + // + // For two inputs differing in just the input bits + // Where "differ" means xor or subtraction + // And the base value is random, or a counting value starting at that bit + // The final result will have each bit of h0, h1 flip + // For every input bit, + // with probability 50 +- .3% (it is probably better than that) + // For every pair of input bits, + // with probability 50 +- .75% (the worst case is approximately that) + // + static INLINE void ShortEnd(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3) { + h3 ^= h2; + h2 = Rot64(h2, 15); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 52); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 26); + h1 += h0; + h2 ^= h1; + h1 = Rot64(h1, 51); + h2 += h1; + h3 ^= h2; + h2 = Rot64(h2, 28); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 9); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 47); + h1 += h0; + h2 ^= h1; + h1 = Rot64(h1, 54); + h2 += h1; + h3 ^= h2; + h2 = Rot64(h2, 32); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 25); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 63); + h1 += h0; + } + + private: + // + // Short is used for messages under 192 bytes in length + // Short has a low startup cost, the normal mode is good for long + // keys, the cost crossover is at about 192 bytes. The two modes were + // held to the same quality bar. + // + static void Short( + const void *message, // message (array of bytes, not necessarily aligned) + size_t length, // length of message (in bytes) + uint64 *hash1, // in/out: in the seed, out the hash value + uint64 *hash2); // in/out: in the seed, out the hash value + + // number of uint64's in internal state + static const size_t sc_numVars = 12; + + // size of the internal state + static const size_t sc_blockSize = sc_numVars * 8; + + // size of buffer of unhashed data, in bytes + static const size_t sc_bufSize = 2 * sc_blockSize; + + // + // sc_const: a constant which: + // * is not zero + // * is odd + // * is a not-very-regular mix of 1's and 0's + // * does not need any other special mathematical properties + // + static const uint64 sc_const = 0xdeadbeefdeadbeefULL; + + uint64 m_data[2 * sc_numVars]; // unhashed data, for partial messages + uint64 m_state[sc_numVars]; // internal state of the hash + size_t m_length; // total length of the input so far + uint8 m_remainder; // length of unhashed data stashed in m_data +}; +} // namespace details + +/// A fast non-cryptographic 64-bit hash wrapper based on SpookyHash. +/// +/// This class provides a convenient interface for incremental hashing using +/// the SpookyHash algorithm. +class FastHash final { + details::SpookyHash hasher_; + + public: + /// Constructs a new FastHash instance with a given seed. + /// + /// @param seed A 64-bit seed to initialize the hash state. + FastHash(uint64 seed) { hasher_.Init(seed, 1528); } + + /// Appends data to the hash. + /// + /// This method can be called multiple times to hash input data in chunks. + /// + /// @param data Pointer to the data to hash. + /// @param length Size of the data in bytes. + void append(const void *data, size_t length) { hasher_.Update(data, length); } + + /// Finalizes the hash computation and returns a 64-bit hash value. + /// + /// This method finalizes the internal state and returns the first + /// 64 bits of the 128-bit SpookyHash output. + /// + /// @return A 64-bit hash value. + uint64 final() { + uint64 res[2]; + hasher_.Final(&res[0], &res[1]); + return res[0]; + } +}; + +} // namespace datadog::common diff --git a/src/datadog/datadog_agent.cpp b/src/datadog/datadog_agent.cpp index af925797..e3fb611e 100644 --- a/src/datadog/datadog_agent.cpp +++ b/src/datadog/datadog_agent.cpp @@ -18,6 +18,7 @@ #include "json.hpp" #include "msgpack.h" #include "span_data.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" namespace datadog { @@ -252,15 +253,20 @@ void DatadogAgent::flush() { const DictReader& /*response_headers*/, std::string response_body) { if (response_status >= 500) { - telemetry::metrics().trace_api.responses_5xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:5xx"}); } else if (response_status >= 400) { - telemetry::metrics().trace_api.responses_4xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:4xx"}); } else if (response_status >= 300) { - telemetry::metrics().trace_api.responses_3xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:3xx"}); } else if (response_status >= 200) { - telemetry::metrics().trace_api.responses_2xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:2xx"}); } else if (response_status >= 100) { - telemetry::metrics().trace_api.responses_1xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:1xx"}); } if (response_status != 200) { logger->log_error([&](auto& stream) { @@ -298,12 +304,13 @@ void DatadogAgent::flush() { // request or retrieving the response. It's invoked // asynchronously. auto on_error = [logger = logger_](Error error) { - telemetry::metrics().trace_api.errors_network.inc(); + telemetry::counter::increment(metrics::tracer::api::errors, + {"type:network"}); logger->log_error(error.with_prefix( "Error occurred during HTTP request for submitting traces: ")); }; - telemetry::metrics().trace_api.requests.inc(); + telemetry::counter::increment(metrics::tracer::api::requests); auto post_result = http_client_->post(traces_endpoint_, std::move(set_request_headers), std::move(body), std::move(on_response), diff --git a/src/datadog/telemetry/metric_context.h b/src/datadog/telemetry/metric_context.h new file mode 100644 index 00000000..f0a13387 --- /dev/null +++ b/src/datadog/telemetry/metric_context.h @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include "common/hash.h" + +namespace datadog::telemetry { + +// TODO: `enable_if_t` is same `Metric` + +/// `MetricContext` is a unique identifier for metrics. +/// It depends on the kind of metric, its name, scope, common and the set of +/// tags. +template +struct MetricContext final { + /// The metric definition. + Metric id; + /// Set of tags. + std::vector tags; + + std::size_t hash() const { + common::FastHash h(3028); + h.append(id.name.data(), id.name.size()); + h.append(id.scope.data(), id.scope.size()); + h.append(&id.common, sizeof(id.common)); + for (const auto& t : tags) { + h.append(t.data(), t.size()); + } + return static_cast(h.final()); + } + + bool operator==(const MetricContext& rhs) const { + return id.name == rhs.id.name && id.scope == rhs.id.scope && + id.common == rhs.id.common && tags == rhs.tags; + } +}; + +} // namespace datadog::telemetry + +template <> +struct std::hash< + datadog::telemetry::MetricContext> { + std::size_t operator()( + const datadog::telemetry::MetricContext& + counter) const { + return counter.hash(); + } +}; + +template <> +struct std::hash> { + std::size_t operator()( + const datadog::telemetry::MetricContext& rate) + const { + return rate.hash(); + } +}; + +template <> +struct std::hash< + datadog::telemetry::MetricContext> { + std::size_t operator()( + const datadog::telemetry::MetricContext& + distribution) const { + return distribution.hash(); + } +}; diff --git a/src/datadog/telemetry/metrics.cpp b/src/datadog/telemetry/metrics.cpp deleted file mode 100644 index 8140cad3..00000000 --- a/src/datadog/telemetry/metrics.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include - -namespace datadog { -namespace telemetry { - -Metric::Metric(std::string name, std::string type, std::string scope, - std::vector tags, bool common) - : name_(std::move(name)), - type_(std::move(type)), - scope_(std::move(scope)), - tags_(std::move(tags)), - common_(common) {} -std::string Metric::name() { return name_; } -std::string Metric::type() { return type_; } -std::string Metric::scope() { return scope_; } -std::vector Metric::tags() { return tags_; } -bool Metric::common() { return common_; } -uint64_t Metric::value() { return value_; } -uint64_t Metric::capture_and_reset_value() { return value_.exchange(0); } - -CounterMetric::CounterMetric(std::string name, std::string scope, - std::vector tags, bool common) - : Metric(name, "count", scope, tags, common) {} -void CounterMetric::inc() { add(1); } -void CounterMetric::add(uint64_t amount) { value_ += amount; } - -GaugeMetric::GaugeMetric(std::string name, std::string scope, - std::vector tags, bool common) - : Metric(name, "gauge", scope, tags, common) {} -void GaugeMetric::set(uint64_t value) { value_ = value; } -void GaugeMetric::inc() { add(1); } -void GaugeMetric::add(uint64_t amount) { value_ += amount; } -void GaugeMetric::dec() { sub(1); } -void GaugeMetric::sub(uint64_t amount) { - if (amount > value_) { - value_ = 0; - } else { - value_ -= amount; - } -} - -} // namespace telemetry -} // namespace datadog diff --git a/src/datadog/telemetry/telemetry.cpp b/src/datadog/telemetry/telemetry.cpp index 2c03f6be..7661edf1 100644 --- a/src/datadog/telemetry/telemetry.cpp +++ b/src/datadog/telemetry/telemetry.cpp @@ -24,15 +24,11 @@ using NoopTelemetry = std::monostate; /// implementation. using TelemetryProxy = std::variant; -// NOTE(@dmehala): until metrics handling is improved. -static DefaultMetrics noop_metrics; - /// NOTE(@dmehala): Here to facilitate Meyer's singleton construction. struct Ctor_param final { FinalizedConfiguration configuration; std::shared_ptr logger; std::shared_ptr client; - std::vector> metrics; std::shared_ptr scheduler; tracing::HTTPClient::URL agent_url; tracing::Clock clock = tracing::default_clock; @@ -41,8 +37,7 @@ struct Ctor_param final { TelemetryProxy make_telemetry(const Ctor_param& init) { if (!init.configuration.enabled) return NoopTelemetry{}; return Telemetry{init.configuration, init.logger, init.client, - init.metrics, init.scheduler, init.agent_url, - init.clock}; + init.scheduler, init.agent_url, init.clock}; } TelemetryProxy& instance( @@ -54,11 +49,10 @@ TelemetryProxy& instance( void init(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock) { - instance(Ctor_param{configuration, logger, client, metrics, event_scheduler, - agent_url, clock}); + instance(Ctor_param{configuration, logger, client, event_scheduler, agent_url, + clock}); } void send_configuration_change() { @@ -81,15 +75,6 @@ void capture_configuration_change( instance()); } -DefaultMetrics& metrics() { - auto& proxy = instance(); - if (std::holds_alternative(proxy)) { - return noop_metrics; - } else { - return std::get(proxy).metrics(); - } -} - void report_warning_log(std::string message) { std::visit(details::Overload{ [&](Telemetry& telemetry) { telemetry.log_warning(message); }, @@ -116,4 +101,110 @@ void report_error_log(std::string message, std::string stacktrace) { instance()); } +namespace counter { +void increment(const Counter& counter) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.increment_counter(counter); }, + [](auto&&) {}, + }, + instance()); +} + +void increment(const Counter& counter, const std::vector& tags) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.increment_counter(counter, tags); + }, + [](auto&&) {}, + }, + instance()); +} + +void decrement(const Counter& counter) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.decrement_counter(counter); }, + [](auto&&) {}, + }, + instance()); +} + +void decrement(const Counter& counter, const std::vector& tags) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.decrement_counter(counter, tags); + }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Counter& counter, uint64_t value) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_counter(counter, value); }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Counter& counter, const std::vector& tags, + uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.set_counter(counter, tags, value); + }, + [](auto&&) {}, + }, + instance()); +} + +} // namespace counter + +namespace rate { +void set(const Rate& rate, uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_rate(rate, value); }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Rate& rate, const std::vector& tags, + uint64_t value) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_rate(rate, tags, value); }, + [](auto&&) {}, + }, + instance()); +} +} // namespace rate + +namespace distribution { + +void add(const Distribution& distribution, uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.add_datapoint(distribution, value); + }, + [](auto&&) {}, + }, + instance()); +} + +void add(const Distribution& distribution, const std::vector& tags, + uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.add_datapoint(distribution, tags, value); + }, + [](auto&&) {}, + }, + instance()); +} + +} // namespace distribution + } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index ad6ec9c5..3fb7393f 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -88,12 +88,74 @@ nlohmann::json encode_log(const telemetry::LogMessage& log) { return encoded; } +std::string_view to_string(details::MetricType type) { + using namespace datadog::telemetry::details; + switch (type) { + case MetricType::counter: + return "count"; + case MetricType::rate: + return "rate"; + case MetricType::distribution: + return "distribution"; + } + + return ""; +} + +// TODO: do `enable_if` +template +void encode_metrics( + nlohmann::json::array_t& metrics, + const std::unordered_map, telemetry::MetricSnapshot>& + counters_snapshots) { + for (const auto& [metric_ctx, snapshots] : counters_snapshots) { + auto encoded = nlohmann::json{ + {"metric", metric_ctx.id.name}, + {"type", to_string(metric_ctx.id.type)}, + {"common", metric_ctx.id.common}, + {"namespace", metric_ctx.id.scope}, + }; + + if (!metric_ctx.tags.empty()) { + encoded.emplace("tags", metric_ctx.tags); + } + + auto points = nlohmann::json::array(); + for (const auto& [timestamp, value] : snapshots) { + points.emplace_back(nlohmann::json::array({timestamp, value})); + } + + encoded.emplace("points", points); + metrics.emplace_back(std::move(encoded)); + } +} + +nlohmann::json encode_distributions( + const std::unordered_map, + std::vector>& distributions) { + auto j = nlohmann::json::array(); + + for (const auto& [metric_ctx, values] : distributions) { + auto series = nlohmann::json{ + {"metric", metric_ctx.id.name}, + {"common", metric_ctx.id.common}, + {"namespace", metric_ctx.id.scope}, + {"points", values}, + }; + if (!metric_ctx.tags.empty()) { + series.emplace("tags", metric_ctx.tags); + } + j.emplace_back(series); + } + + return j; +} + } // namespace Telemetry::Telemetry(FinalizedConfiguration config, std::shared_ptr logger, std::shared_ptr client, - std::vector> user_metrics, std::shared_ptr event_scheduler, HTTPClient::URL agent_url, Clock clock) : config_(std::move(config)), @@ -104,44 +166,7 @@ Telemetry::Telemetry(FinalizedConfiguration config, http_client_(client), clock_(std::move(clock)), scheduler_(event_scheduler), - user_metrics_(std::move(user_metrics)), host_info_(get_host_info()) { - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - // Callback for successful telemetry HTTP requests, to examine HTTP // status. telemetry_on_response_ = [logger = logger_]( @@ -191,8 +216,7 @@ Telemetry::~Telemetry() { } Telemetry::Telemetry(Telemetry&& rhs) - : metrics_(std::move(rhs.metrics_)), - config_(std::move(rhs.config_)), + : config_(std::move(rhs.config_)), logger_(std::move(rhs.logger_)), telemetry_on_response_(std::move(rhs.telemetry_on_response_)), telemetry_on_error_(std::move(rhs.telemetry_on_error_)), @@ -201,56 +225,16 @@ Telemetry::Telemetry(Telemetry&& rhs) http_client_(rhs.http_client_), clock_(std::move(rhs.clock_)), scheduler_(std::move(rhs.scheduler_)), - user_metrics_(std::move(rhs.user_metrics_)), seq_id_(rhs.seq_id_), config_seq_ids_(rhs.config_seq_ids_), host_info_(rhs.host_info_) { cancel_tasks(rhs.tasks_); - - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - schedule_tasks(); } Telemetry& Telemetry::operator=(Telemetry&& rhs) { if (&rhs != this) { cancel_tasks(rhs.tasks_); - - std::swap(metrics_, rhs.metrics_); std::swap(config_, rhs.config_); std::swap(logger_, rhs.logger_); std::swap(telemetry_on_response_, rhs.telemetry_on_response_); @@ -260,47 +244,9 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(tracer_signature_, rhs.tracer_signature_); std::swap(http_client_, rhs.http_client_); std::swap(clock_, rhs.clock_); - std::swap(user_metrics_, rhs.user_metrics_); std::swap(seq_id_, rhs.seq_id_); std::swap(config_seq_ids_, rhs.config_seq_ids_); std::swap(host_info_, rhs.host_info_); - - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - schedule_tasks(); } return *this; @@ -379,37 +325,28 @@ std::string Telemetry::heartbeat_and_telemetry() { }); batch_payloads.emplace_back(std::move(heartbeat)); - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"namespace", metric.scope()}, - {"common", metric.common()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"namespace", metric.scope()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - })); - } - } - points.clear(); + std::unordered_map, std::vector> + distributions; + { + std::lock_guard l{distributions_mutex_}; + std::swap(distributions_, distributions); + } + + std::unordered_map, MetricSnapshot> counters_snapshot; + { + std::lock_guard l{counter_mutex_}; + std::swap(counters_snapshot_, counters_snapshot); } + std::unordered_map, MetricSnapshot> rates_snapshot; + { + std::lock_guard l{rate_mutex_}; + std::swap(rates_snapshot_, rates_snapshot); + } + + nlohmann::json::array_t metrics = nlohmann::json::array(); + encode_metrics(metrics, counters_snapshot); + encode_metrics(metrics, rates_snapshot); if (!metrics.empty()) { auto generate_metrics = nlohmann::json::object({ {"request_type", "generate-metrics"}, @@ -420,6 +357,20 @@ std::string Telemetry::heartbeat_and_telemetry() { batch_payloads.emplace_back(std::move(generate_metrics)); } + if (auto distributions_series = encode_distributions(distributions); + !distributions.empty()) { + auto distributions_json = nlohmann::json{ + {"request_type", "distributions"}, + { + "payload", + nlohmann::json{ + {"series", distributions_series}, + }, + }, + }; + batch_payloads.emplace_back(std::move(distributions_json)); + } + if (!logs_.empty()) { auto encoded_logs = nlohmann::json::array(); for (const auto& log : logs_) { @@ -456,35 +407,6 @@ std::string Telemetry::app_closing() { batch_payloads.emplace_back(std::move(app_closing)); auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } - } - points.clear(); - } if (!metrics.empty()) { auto generate_metrics = nlohmann::json::object({ @@ -701,12 +623,27 @@ void Telemetry::capture_metrics() { std::time_t timepoint = std::chrono::duration_cast( clock_().wall.time_since_epoch()) .count(); - for (auto& m : metrics_snapshots_) { - auto value = m.first.get().capture_and_reset_value(); - if (value == 0) { - continue; - } - m.second.emplace_back(timepoint, value); + + std::unordered_map, uint64_t> counter_snapshot; + { + std::lock_guard l{counter_mutex_}; + std::swap(counter_snapshot, counters_); + } + + for (auto& [counter, value] : counter_snapshot) { + auto& counter_snapshots = counters_snapshot_[counter]; + counter_snapshots.emplace_back(std::make_pair(timepoint, value)); + } + + std::unordered_map, uint64_t> rate_snapshot; + { + std::lock_guard l{rate_mutex_}; + std::swap(rate_snapshot, rates_); + } + + for (auto& [rate, value] : rate_snapshot) { + auto& rates_snapshots = rates_snapshot_[rate]; + rates_snapshots.emplace_back(std::make_pair(timepoint, value)); } } @@ -719,4 +656,57 @@ void Telemetry::log(std::string message, telemetry::LogLevel level, telemetry::LogMessage{std::move(message), level, stacktrace, timestamp}); } +void Telemetry::increment_counter(const Counter& id) { + increment_counter(id, {}); +} + +void Telemetry::increment_counter(const Counter& id, + const std::vector& tags) { + std::lock_guard l{counter_mutex_}; + counters_[{id, tags}] += 1; +} + +void Telemetry::decrement_counter(const Counter& id) { + decrement_counter(id, {}); +} + +void Telemetry::decrement_counter(const Counter& id, + const std::vector& tags) { + std::lock_guard l{counter_mutex_}; + auto& v = counters_[{id, tags}]; + if (v > 0) v -= 1; +} + +void Telemetry::set_counter(const Counter& id, uint64_t value) { + set_counter(id, {}, value); +} + +void Telemetry::set_counter(const Counter& id, + const std::vector& tags, + uint64_t value) { + std::lock_guard l{counter_mutex_}; + counters_[{id, tags}] = value; +} + +void Telemetry::set_rate(const Rate& id, uint64_t value) { + set_rate(id, {}, value); +} + +void Telemetry::set_rate(const Rate& id, const std::vector& tags, + uint64_t value) { + std::lock_guard l{rate_mutex_}; + rates_[{id, tags}] = value; +} + +void Telemetry::add_datapoint(const Distribution& id, uint64_t value) { + add_datapoint(id, {}, value); +} + +void Telemetry::add_datapoint(const Distribution& id, + const std::vector& tags, + uint64_t value) { + std::lock_guard l{distributions_mutex_}; + distributions_[{id, tags}].emplace_back(value); +} + } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 33a4d586..cd077f49 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -9,16 +9,29 @@ #include #include +#include + #include "json.hpp" #include "log.h" +#include "metric_context.h" #include "platform_util.h" namespace datadog::telemetry { +// This uses a reference_wrapper so references to internal metric values can +// be captured, and be iterated trivially when the values need to be +// snapshotted and published in telemetry messages. +using MetricSnapshot = std::vector>; + /// The telemetry class is responsible for handling internal telemetry data to /// track Datadog product usage. It _can_ collect and report logs and metrics. +/// +/// NOTE(@dmehala): The current implementation can lead a significant amount +/// of overhead if the mutext is highly disputed. Unless this is proven to be +/// indeed a bottleneck, I'll embrace KISS principle. However, in a future +/// iteration we could use multiple producer single consumer queue or +/// lock-free queue. class Telemetry final { - DefaultMetrics metrics_; /// Configuration object containing the validated settings for telemetry FinalizedConfiguration config_; /// Shared pointer to the user logger instance. @@ -32,15 +45,23 @@ class Telemetry final { tracing::Clock clock_; std::shared_ptr scheduler_; - // This uses a reference_wrapper so references to internal metric values can - // be captured, and be iterated trivially when the values need to be - // snapshotted and published in telemetry messages. - using MetricSnapshot = std::vector>; - std::vector< - std::pair, MetricSnapshot>> - metrics_snapshots_; - std::vector> user_metrics_; + /// Counter + std::mutex counter_mutex_; + std::unordered_map, uint64_t> counters_; + std::unordered_map, MetricSnapshot> counters_snapshot_; + + /// Rate + std::mutex rate_mutex_; + std::unordered_map, uint64_t> rates_; + std::unordered_map, MetricSnapshot> rates_snapshot_; + /// Distribution + /// TODO: split distribution in array of N element? + std::mutex distributions_mutex_; + std::unordered_map, std::vector> + distributions_; + + /// Configuration std::vector configuration_snapshot_; std::vector logs_; @@ -61,7 +82,6 @@ class Telemetry final { Telemetry(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> user_metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); @@ -75,10 +95,6 @@ class Telemetry final { Telemetry(Telemetry&& rhs); Telemetry& operator=(Telemetry&&); - // Provides access to the telemetry metrics for updating the values. - // This value should not be stored. - inline auto& metrics() { return metrics_; } - /// Capture and report internal error message to Datadog. /// /// @param message The error message. @@ -95,6 +111,27 @@ class Telemetry final { void capture_configuration_change( const std::vector& new_configuration); + /// Counter + void increment_counter(const Counter& counter); + void increment_counter(const Counter& counter, + const std::vector& tags); + void decrement_counter(const Counter& counter); + void decrement_counter(const Counter& counter, + const std::vector& tags); + void set_counter(const Counter& counter, uint64_t value); + void set_counter(const Counter& counter, const std::vector& tags, + uint64_t value); + + /// Rate + void set_rate(const Rate& rate, uint64_t value); + void set_rate(const Rate& rate, const std::vector& tags, + uint64_t value); + + /// Distribution + void add_datapoint(const Distribution& distribution, uint64_t value); + void add_datapoint(const Distribution& distribution, + const std::vector& tags, uint64_t value); + private: void send_telemetry(tracing::StringView request_type, std::string payload); diff --git a/src/datadog/telemetry_metrics.cpp b/src/datadog/telemetry_metrics.cpp new file mode 100644 index 00000000..ccf56db9 --- /dev/null +++ b/src/datadog/telemetry_metrics.cpp @@ -0,0 +1,27 @@ +#include "telemetry_metrics.h" + +namespace datadog::tracing::metrics { + +namespace tracer { +const telemetry::Counter spans_created = {"spans_created", "tracers", true}; +const telemetry::Counter spans_finished = {"spans_finished", "tracers", true}; + +const telemetry::Counter trace_segments_created = {"trace_segments_created", + "tracers", true}; + +const telemetry::Counter trace_segments_closed = {"trace_segments_closed", + "tracers", true}; +const telemetry::Counter context_header_truncated = { + "context_header.truncated", + "tracers", + true, +}; + +namespace api { +const telemetry::Counter requests = {"trace_api.requests", "tracers", true}; +const telemetry::Counter responses = {"trace_api.responses", "tracers", true}; +const telemetry::Counter errors = {"trace_api.errors", "tracers", true}; +} // namespace api +} // namespace tracer + +} // namespace datadog::tracing::metrics diff --git a/src/datadog/telemetry_metrics.h b/src/datadog/telemetry_metrics.h new file mode 100644 index 00000000..b3461475 --- /dev/null +++ b/src/datadog/telemetry_metrics.h @@ -0,0 +1,53 @@ +#pragma once + +#include + +namespace datadog::tracing::metrics { + +namespace tracer { + +/// The number of spans created by the tracer, tagged by manual API +/// (`integration_name:datadog`, `integration_name:otel` or +/// `integration_name:opentracing`). +extern const telemetry::Counter spans_created; + +/// The number of spans finished, optionally (if implementation allows) tagged +/// manual API (`integration_name:datadog`, `integration_name:otel` or +/// `integration_name:opentracing`). +extern const telemetry::Counter spans_finished; + +/// The number of trace segments (local traces) created, tagged with +/// new/continued depending on whether this is a new trace (no distributed +/// context information) or continued (has distributed context). +extern const telemetry::Counter trace_segments_created; + +/// The number of trace segments (local traces) closed. In non partial flush +/// scenarios, trace_segments_closed == trace_chunks_enqueued. +extern const telemetry::Counter trace_segments_closed; + +/// The number of times a context propagation header is truncated, tagged by the +/// reason for truncation (`truncation_reason:baggage_item_count_exceeded`, +/// `truncation_reason:baggage_byte_count_exceeded`). +extern const telemetry::Counter context_header_truncated; + +namespace api { + +/// The number of requests sent to the trace endpoint in the agent, regardless +/// of success. +extern const telemetry::Counter requests; + +/// The number of responses received from the trace endpoint, tagged with status +/// code, e.g. `status_code:200`, `status_code:404`. May also use +/// `status_code:5xx` for example as a catch-all for 2xx, 3xx, 4xx, 5xx +/// responses. +extern const telemetry::Counter responses; + +/// The number of requests sent to the trace endpoint in the agent that errored, +/// tagged by the error type (e.g. `type:timeout`, `type:network`, +/// `type:status_code`). +extern const telemetry::Counter errors; + +} // namespace api +} // namespace tracer + +} // namespace datadog::tracing::metrics diff --git a/src/datadog/trace_segment.cpp b/src/datadog/trace_segment.cpp index fcdcc060..44d0d1ee 100644 --- a/src/datadog/trace_segment.cpp +++ b/src/datadog/trace_segment.cpp @@ -16,16 +16,14 @@ #include #include -#include "collector_response.h" #include "config_manager.h" #include "hex.h" -#include "json.hpp" #include "platform_util.h" -#include "random.h" #include "span_data.h" #include "span_sampler.h" #include "tag_propagation.h" #include "tags.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" #include "w3c_propagation.h" @@ -142,7 +140,7 @@ Optional TraceSegment::sampling_decision() const { Logger& TraceSegment::logger() const { return *logger_; } void TraceSegment::register_span(std::unique_ptr span) { - telemetry::metrics().tracer.spans_created.inc(); + telemetry::counter::increment(metrics::tracer::spans_created); std::lock_guard lock(mutex_); assert(spans_.empty() || num_finished_spans_ < spans_.size()); @@ -151,7 +149,7 @@ void TraceSegment::register_span(std::unique_ptr span) { void TraceSegment::span_finished() { { - telemetry::metrics().tracer.spans_finished.inc(); + telemetry::counter::increment(metrics::tracer::spans_finished); std::lock_guard lock(mutex_); ++num_finished_spans_; assert(num_finished_spans_ <= spans_.size()); @@ -242,7 +240,7 @@ void TraceSegment::span_finished() { } } - telemetry::metrics().tracer.trace_segments_closed.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_closed); } void TraceSegment::override_sampling_priority(SamplingPriority priority) { diff --git a/src/datadog/tracer.cpp b/src/datadog/tracer.cpp index dac30f13..14eb0ad9 100644 --- a/src/datadog/tracer.cpp +++ b/src/datadog/tracer.cpp @@ -26,6 +26,7 @@ #include "span_data.h" #include "span_sampler.h" #include "tags.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" #include "w3c_propagation.h" @@ -59,7 +60,6 @@ Tracer::Tracer(const FinalizedTracerConfig& config, baggage_injection_enabled_(false), baggage_extraction_enabled_(false) { telemetry::init(config.telemetry, logger_, config.http_client, - std::vector>{}, config.event_scheduler, config.agent_url); if (config.report_hostname) { hostname_ = get_hostname(); @@ -181,7 +181,8 @@ Span Tracer::create_span(const SpanConfig& config) { } const auto span_data_ptr = span_data.get(); - telemetry::metrics().tracer.trace_segments_created_new.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_created, + {"new_continued:new"}); const auto segment = std::make_shared( logger_, collector_, config_manager_->trace_sampler(), span_sampler_, defaults, config_manager_, runtime_id_, injection_styles_, hostname_, @@ -380,7 +381,8 @@ Expected Tracer::extract_span(const DictReader& reader, } const auto span_data_ptr = span_data.get(); - telemetry::metrics().tracer.trace_segments_created_continued.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_created, + {"new_continued:new"}); const auto segment = std::make_shared( logger_, collector_, config_manager_->trace_sampler(), span_sampler_, config_manager_->span_defaults(), config_manager_, runtime_id_, @@ -441,9 +443,13 @@ Expected Tracer::inject(const Baggage& baggage, DictWriter& writer) { err->with_prefix("failed to serialize all baggage items: ")); if (err->code == Error::Code::BAGGAGE_MAXIMUM_BYTES_REACHED) { - telemetry::metrics().tracer.baggage_bytes_exceeded.inc(); + telemetry::counter::increment( + metrics::tracer::context_header_truncated, + {"truncation_reason:baggage_item_count_exceeded"}); } else if (err->code == Error::Code::BAGGAGE_MAXIMUM_ITEMS_REACHED) { - telemetry::metrics().tracer.baggage_items_exceeded.inc(); + telemetry::counter::increment( + metrics::tracer::context_header_truncated, + {"truncation_reason:baggage_byte_count_exceeded"}); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ab4f084b..ecc6aede 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,7 +18,6 @@ add_executable(tests # telemetry test cases telemetry/test_configuration.cpp - telemetry/test_metrics.cpp telemetry/test_telemetry.cpp # test cases diff --git a/test/telemetry/test_metrics.cpp b/test/telemetry/test_metrics.cpp deleted file mode 100644 index 3078c084..00000000 --- a/test/telemetry/test_metrics.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// This test covers operations defined for metrics defined in `metrics.h`. - -#include - -#include "test.h" - -using namespace datadog::telemetry; - -#define TELEMETRY_METRICS_TEST(x) \ - TEST_CASE(x, "[telemetry],[telemetry.metrics]") - -TELEMETRY_METRICS_TEST("Counter metrics") { - CounterMetric metric = { - "test.counter.metric", "test_scope", {"testing-testing:123"}, true}; - - metric.inc(); - metric.add(41); - REQUIRE(metric.value() == 42); - auto captured_value = metric.capture_and_reset_value(); - REQUIRE(captured_value == 42); - REQUIRE(metric.value() == 0); -} - -TELEMETRY_METRICS_TEST("Gauge metrics") { - GaugeMetric metric = { - "test.gauge.metric", "test_scope", {"testing-testing:123"}, true}; - metric.set(40); - metric.inc(); - metric.add(10); - metric.sub(8); - metric.dec(); - REQUIRE(metric.value() == 42); - auto captured_value = metric.capture_and_reset_value(); - REQUIRE(captured_value == 42); - REQUIRE(metric.value() == 0); - - metric.add(10); - metric.sub(11); - REQUIRE(metric.value() == 0); -} diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index f3795475..6ec74710 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -89,12 +89,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("ctor send app-started message") { SECTION("Without a defined integration") { - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry{*finalize_config(), logger, client, scheduler, *url}; /// By default the integration is `datadog` with the tracer version. /// TODO: remove the default because these datadog are already part of the /// request header. @@ -114,11 +109,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.integration_name = "nginx"; cfg.integration_version = "1.25.2"; - Telemetry telemetry2{*finalize_config(cfg), - logger, - client, - std::vector>{}, - scheduler, + Telemetry telemetry2{*finalize_config(cfg), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); @@ -144,12 +135,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { ddtest::EnvGuard install_time_env("DD_INSTRUMENTATION_INSTALL_TIME", "1703188212"); - Telemetry telemetry4{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry4{*finalize_config(), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(app_started) == true); @@ -191,11 +177,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.products.emplace_back(std::move(product)); - Telemetry telemetry3{*finalize_config(cfg), - logger, - client, - std::vector>{}, - scheduler, + Telemetry telemetry3{*finalize_config(cfg), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); @@ -299,12 +281,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("dtor send app-closing message") { { - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry{*finalize_config(), logger, client, scheduler, *url}; client->clear(); } @@ -335,13 +312,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto url = HTTPClient::URL::parse("http://localhost:8000"); - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url, - clock}; + Telemetry telemetry{*finalize_config(), logger, client, + scheduler, *url, clock}; SECTION("generates a heartbeat message") { client->clear(); @@ -355,100 +327,305 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { REQUIRE(heartbeat["request_type"] == "app-heartbeat"); } - SECTION("captures metrics and sends generate-metrics payload") { - telemetry.metrics().tracer.trace_segments_created_new.inc(); - REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 1); - scheduler->trigger_metrics_capture(); - scheduler->trigger_heartbeat(); + SECTION("metrics reporting") { + SECTION("counters are correctly serialized in generate-metrics payload") { + client->clear(); + /// test cases for counters: + /// - can't decrement below zero. -> is that a telemetry requirements? + /// - rates or counter reset to zero after capture. + const Counter my_counter{"my_counter", "counter-test", true}; + telemetry.increment_counter(my_counter); // = 1 + telemetry.increment_counter(my_counter); // = 2 + telemetry.increment_counter(my_counter); // = 3 + telemetry.decrement_counter(my_counter); // = 2 + scheduler->trigger_metrics_capture(); + + telemetry.increment_counter(my_counter); // = 1 + scheduler->trigger_metrics_capture(); + + telemetry.set_counter(my_counter, 42); + telemetry.set_counter(my_counter, {"event:test"}, 100); + telemetry.decrement_counter(my_counter, {"event:test"}); + scheduler->trigger_metrics_capture(); + + // Expect 2 series: + // - `my_counter` without tags: 3 datapoint (2, 1, 42) with the same + // timestamp. + // - `my_counter` with `event:test` tags: 1 datapoint (99). + scheduler->trigger_heartbeat(); - REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 0); + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; + + auto series = payload["series"]; + REQUIRE(series.size() == 2); + + for (const auto& s : series) { + CHECK(s["metric"] == "my_counter"); + CHECK(s["type"] == "count"); + CHECK(s["common"] == true); + CHECK(s["namespace"] == "counter-test"); + if (s.contains("tags")) { + REQUIRE(s["tags"].size() == 1); + CHECK(s["tags"][0] == "event:test"); + + auto points = s["points"]; + REQUIRE(points.size() == 1); + + CHECK(points[0][0] == mock_time); + CHECK(points[0][1] == 99); + } else { + auto points = s["points"]; + REQUIRE(points.size() == 3); + + CHECK(points[0][0] == mock_time); + CHECK(points[0][1] == 2); + + CHECK(points[1][0] == mock_time); + CHECK(points[1][1] == 1); + + CHECK(points[2][0] == mock_time); + CHECK(points[2][1] == 42); + } + } - auto heartbeat_and_telemetry_message = client->request_body; - auto message_batch = nlohmann::json::parse(heartbeat_and_telemetry_message); - REQUIRE(is_valid_telemetry_payload(message_batch) == true); - REQUIRE(message_batch["payload"].size() == 2); - auto generate_metrics = message_batch["payload"][1]; - REQUIRE(generate_metrics["request_type"] == "generate-metrics"); - auto payload = generate_metrics["payload"]; - auto series = payload["series"]; - REQUIRE(series.size() == 1); - auto metric = series[0]; - REQUIRE(metric["metric"] == "trace_segments_created"); - auto tags = metric["tags"]; - REQUIRE(tags.size() == 1); - REQUIRE(tags[0] == "new_continued:new"); - auto points = metric["points"]; - REQUIRE(points.size() == 1); - REQUIRE(points[0][0] == mock_time); - REQUIRE(points[0][1] == 1); - } + // Make sure the next heartbeat doesn't contains counters if no + // datapoint has been incremented, decremented or set. + client->clear(); + scheduler->trigger_heartbeat(); - SECTION("logs serialization") { - SECTION("log level is correct") { - struct TestCase { - std::string_view name; - std::string input; - Optional stacktrace; - std::function& stacktrace)> - apply; - std::string expected_log_level; - }; + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); - auto test_case = GENERATE(values({ - { - "warning log", - "This is a warning log!", - nullopt, - [](Telemetry& telemetry, const std::string& input, - const Optional&) { - telemetry.log_warning(input); - }, - "WARNING", - }, - { - "error log", - "This is an error log!", - nullopt, - [](Telemetry& telemetry, const std::string& input, - const Optional&) { telemetry.log_error(input); }, - "ERROR", - }, - { - "error log with stacktrace", - "This is an error log with a fake stacktrace!", - "error here\nthen here\nfinally here\n", - [](Telemetry& telemetry, const std::string& input, - Optional stacktrace) { - telemetry.log_error(input, *stacktrace); - }, - "ERROR", - }, - })); + auto payload2 = message_batch["payload"][0]; + CHECK(payload2["request_type"] == "app-heartbeat"); + } + + SECTION("rate") { + client->clear(); + + Rate rps{"request", "rate-test", true}; + telemetry.set_rate(rps, 1000); + + scheduler->trigger_metrics_capture(); + + telemetry.set_rate(rps, 2000); + telemetry.set_rate(rps, 5000); + telemetry.set_rate(rps, {"status:2xx"}, 5000); + + scheduler->trigger_metrics_capture(); + + // Expect 2 series: + // - `request` without tags: 2 datapoint (1000, 5000) + // - `request` with tags: 1 datapoint (5000) + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; + + auto series = payload["series"]; + REQUIRE(series.size() == 2); + + for (const auto& s : series) { + CHECK(s["metric"] == "request"); + CHECK(s["type"] == "rate"); + CHECK(s["common"] == true); + CHECK(s["namespace"] == "rate-test"); + if (s.contains("tags")) { + REQUIRE(s["tags"].size() == 1); + CHECK(s["tags"][0] == "status:2xx"); + + auto points = s["points"]; + REQUIRE(points.size() == 1); + + CHECK(points[0][0] == mock_time); + CHECK(points[0][1] == 5000); + } else { + auto points = s["points"]; + REQUIRE(points.size() == 2); + + CHECK(points[0][0] == mock_time); + CHECK(points[0][1] == 1000); + + CHECK(points[1][0] == mock_time); + CHECK(points[1][1] == 5000); + } + } - CAPTURE(test_case.name); + // Make sure the next heartbeat doesn't contains distributions if no + // datapoint has been added to a distribution. + client->clear(); + scheduler->trigger_heartbeat(); + + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); + auto payload2 = message_batch["payload"][0]; + CHECK(payload2["request_type"] == "app-heartbeat"); + } + + SECTION("distribution") { client->clear(); - test_case.apply(telemetry, test_case.input, test_case.stacktrace); + + Distribution response_time{"response_time", "dist-test", false}; + telemetry.add_datapoint(response_time, 128); + telemetry.add_datapoint(response_time, 42); + telemetry.add_datapoint(response_time, 3000); + + // Add a tag, this will add a new serie to the distribution payload. + telemetry.add_datapoint(response_time, {"status:200", "method:GET"}, + 6530); + + Distribution request_size{"request_size", "dist-test-2", true}; + telemetry.add_datapoint(request_size, 1843); + telemetry.add_datapoint(request_size, 4135); + + // Expect 3 series: + // - `response_time` without tags: 3 datapoint (128, 42, 3000). + // - `response_time` with 2 tags: 1 datapoint (6530). + // - `request_size`: 2 datapoint (1843, 4135). scheduler->trigger_heartbeat(); auto message_batch = nlohmann::json::parse(client->request_body); - REQUIRE(is_valid_telemetry_payload(message_batch)); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); REQUIRE(message_batch["payload"].size() == 2); - auto logs_message = message_batch["payload"][1]; - REQUIRE(logs_message["request_type"] == "logs"); + auto distribution_message = message_batch["payload"][1]; + REQUIRE(distribution_message["request_type"] == "distributions"); + + auto distribution_series = distribution_message["payload"]["series"]; + REQUIRE(distribution_series.size() == 3); - auto logs_payload = logs_message["payload"]["logs"]; - REQUIRE(logs_payload.size() == 1); - CHECK(logs_payload[0]["level"] == test_case.expected_log_level); - CHECK(logs_payload[0]["message"] == test_case.input); - CHECK(logs_payload[0].contains("tracer_time")); + for (auto& dist : distribution_series) { + if (dist["metric"] == "response_time") { + CHECK(dist["common"] == false); + CHECK(dist["namespace"] == "dist-test"); + if (dist.contains("tags")) { + const std::vector expected_tags{"status:200", + "method:GET"}; + CHECK(dist["tags"] == expected_tags); - if (test_case.stacktrace) { - CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace); - } else { - CHECK(logs_payload[0].contains("stack_trace") == false); + auto datapoint = dist["points"]; + REQUIRE(datapoint.size() == 1); + + const std::vector expected_points{6530}; + CHECK(expected_points == datapoint); + } else { + auto datapoint = dist["points"]; + REQUIRE(datapoint.size() == 3); + + const std::vector expected_points{128, 42, 3000}; + CHECK(expected_points == datapoint); + } + } else if (dist["metric"] == "request_size") { + CHECK(dist["common"] == true); + CHECK(dist["namespace"] == "dist-test-2"); + + auto datapoint = dist["points"]; + REQUIRE(datapoint.size() == 2); + + const std::vector expected_points{1843, 4135}; + CHECK(expected_points == datapoint); + } else { + FAIL( + "expected distribution name {response_time, request_size} but " + "got " + << dist["metric"]); + } + } + + // Make sure the next heartbeat doesn't contains distributions if no + // datapoint has been added to a distribution. + client->clear(); + scheduler->trigger_heartbeat(); + + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); + + auto payload = message_batch["payload"][0]; + CHECK(payload["request_type"] == "app-heartbeat"); + } + + SECTION("logs serialization") { + SECTION("log level is correct") { + struct TestCase { + std::string_view name; + std::string input; + Optional stacktrace; + std::function& stacktrace)> + apply; + std::string expected_log_level; + }; + + auto test_case = GENERATE(values({ + { + "warning log", + "This is a warning log!", + nullopt, + [](Telemetry& telemetry, const std::string& input, + const Optional&) { + telemetry.log_warning(input); + }, + "WARNING", + }, + { + "error log", + "This is an error log!", + nullopt, + [](Telemetry& telemetry, const std::string& input, + const Optional&) { + telemetry.log_error(input); + }, + "ERROR", + }, + { + "error log with stacktrace", + "This is an error log with a fake stacktrace!", + "error here\nthen here\nfinally here\n", + [](Telemetry& telemetry, const std::string& input, + Optional stacktrace) { + telemetry.log_error(input, *stacktrace); + }, + "ERROR", + }, + })); + + CAPTURE(test_case.name); + + client->clear(); + test_case.apply(telemetry, test_case.input, test_case.stacktrace); + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch)); + REQUIRE(message_batch["payload"].size() == 2); + + auto logs_message = message_batch["payload"][1]; + REQUIRE(logs_message["request_type"] == "logs"); + + auto logs_payload = logs_message["payload"]["logs"]; + REQUIRE(logs_payload.size() == 1); + CHECK(logs_payload[0]["level"] == test_case.expected_log_level); + CHECK(logs_payload[0]["message"] == test_case.input); + CHECK(logs_payload[0].contains("tracer_time")); + + if (test_case.stacktrace) { + CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace); + } else { + CHECK(logs_payload[0].contains("stack_trace") == false); + } } } } @@ -464,7 +641,6 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto logger = std::make_shared(); auto client = std::make_shared(); auto scheduler = std::make_shared(); - std::vector> metrics; const TracerSignature tracer_signature{ /* runtime_id = */ RuntimeID::generate(), @@ -480,7 +656,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); CHECK(scheduler->metrics_callback == nullptr); CHECK(scheduler->metrics_interval == nullopt); } @@ -493,7 +669,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); CHECK(scheduler->metrics_callback != nullptr); CHECK(scheduler->metrics_interval == 500ms); @@ -510,7 +686,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); telemetry.log_error("error"); // NOTE(@dmehala): logs are sent with an heartbeat. From 555b5f808d5c7c84df27d6475d3b957870cad3c2 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 10:56:19 +0200 Subject: [PATCH 2/9] simplify test and send metrics in app-closing --- src/datadog/telemetry/telemetry_impl.cpp | 19 +- test/telemetry/test_telemetry.cpp | 237 ++++++++++++++--------- 2 files changed, 165 insertions(+), 91 deletions(-) diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index 3fb7393f..6e87ad38 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -406,8 +406,9 @@ std::string Telemetry::app_closing() { }); batch_payloads.emplace_back(std::move(app_closing)); - auto metrics = nlohmann::json::array(); - + nlohmann::json::array_t metrics = nlohmann::json::array(); + encode_metrics(metrics, counters_snapshot_); + encode_metrics(metrics, rates_snapshot_); if (!metrics.empty()) { auto generate_metrics = nlohmann::json::object({ {"request_type", "generate-metrics"}, @@ -418,6 +419,20 @@ std::string Telemetry::app_closing() { batch_payloads.emplace_back(std::move(generate_metrics)); } + if (auto distributions_series = encode_distributions(distributions_); + !distributions_.empty()) { + auto distributions_json = nlohmann::json{ + {"request_type", "distributions"}, + { + "payload", + nlohmann::json{ + {"series", distributions_series}, + }, + }, + }; + batch_payloads.emplace_back(std::move(distributions_json)); + } + if (!logs_.empty()) { auto encoded_logs = nlohmann::json::array(); for (const auto& log : logs_) { diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 6ec74710..d408fa9c 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -364,34 +364,32 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto series = payload["series"]; REQUIRE(series.size() == 2); - for (const auto& s : series) { - CHECK(s["metric"] == "my_counter"); - CHECK(s["type"] == "count"); - CHECK(s["common"] == true); - CHECK(s["namespace"] == "counter-test"); - if (s.contains("tags")) { - REQUIRE(s["tags"].size() == 1); - CHECK(s["tags"][0] == "event:test"); - - auto points = s["points"]; - REQUIRE(points.size() == 1); - - CHECK(points[0][0] == mock_time); - CHECK(points[0][1] == 99); - } else { - auto points = s["points"]; - REQUIRE(points.size() == 3); - - CHECK(points[0][0] == mock_time); - CHECK(points[0][1] == 2); - - CHECK(points[1][0] == mock_time); - CHECK(points[1][1] == 1); - - CHECK(points[2][0] == mock_time); - CHECK(points[2][1] == 42); - } - } + const auto expected_metrics = nlohmann::json::parse(R"( + [ + { + "common": true, + "metric": "my_counter", + "namespace": "counter-test", + "points": [ + [ 1672484400, 99 ] + ], + "tags": [ "event:test" ], + "type": "count" + }, + { + "common": true, + "metric": "my_counter", + "namespace": "counter-test", + "points": [ + [ 1672484400, 2 ], + [ 1672484400, 1 ], + [ 1672484400, 42 ] + ], + "type": "count" + } + ] + )"); + CHECK(series == expected_metrics); // Make sure the next heartbeat doesn't contains counters if no // datapoint has been incremented, decremented or set. @@ -435,31 +433,31 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto series = payload["series"]; REQUIRE(series.size() == 2); - for (const auto& s : series) { - CHECK(s["metric"] == "request"); - CHECK(s["type"] == "rate"); - CHECK(s["common"] == true); - CHECK(s["namespace"] == "rate-test"); - if (s.contains("tags")) { - REQUIRE(s["tags"].size() == 1); - CHECK(s["tags"][0] == "status:2xx"); - - auto points = s["points"]; - REQUIRE(points.size() == 1); - - CHECK(points[0][0] == mock_time); - CHECK(points[0][1] == 5000); - } else { - auto points = s["points"]; - REQUIRE(points.size() == 2); - - CHECK(points[0][0] == mock_time); - CHECK(points[0][1] == 1000); - - CHECK(points[1][0] == mock_time); - CHECK(points[1][1] == 5000); - } - } + const auto expected_metrics = nlohmann::json::parse(R"( + [ + { + "common": true, + "metric": "request", + "namespace": "rate-test", + "points": [ + [ 1672484400, 5000 ] + ], + "tags": [ "status:2xx" ], + "type": "rate" + }, + { + "common": true, + "metric": "request", + "namespace": "rate-test", + "points": [ + [ 1672484400, 1000 ], + [ 1672484400, 5000 ] + ], + "type": "rate" + } + ] + )"); + CHECK(series == expected_metrics); // Make sure the next heartbeat doesn't contains distributions if no // datapoint has been added to a distribution. @@ -506,43 +504,29 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto distribution_series = distribution_message["payload"]["series"]; REQUIRE(distribution_series.size() == 3); - for (auto& dist : distribution_series) { - if (dist["metric"] == "response_time") { - CHECK(dist["common"] == false); - CHECK(dist["namespace"] == "dist-test"); - if (dist.contains("tags")) { - const std::vector expected_tags{"status:200", - "method:GET"}; - CHECK(dist["tags"] == expected_tags); - - auto datapoint = dist["points"]; - REQUIRE(datapoint.size() == 1); - - const std::vector expected_points{6530}; - CHECK(expected_points == datapoint); - } else { - auto datapoint = dist["points"]; - REQUIRE(datapoint.size() == 3); - - const std::vector expected_points{128, 42, 3000}; - CHECK(expected_points == datapoint); - } - } else if (dist["metric"] == "request_size") { - CHECK(dist["common"] == true); - CHECK(dist["namespace"] == "dist-test-2"); - - auto datapoint = dist["points"]; - REQUIRE(datapoint.size() == 2); - - const std::vector expected_points{1843, 4135}; - CHECK(expected_points == datapoint); - } else { - FAIL( - "expected distribution name {response_time, request_size} but " - "got " - << dist["metric"]); + const auto expected_series = nlohmann::json::parse(R"([ + { + "common":false, + "metric":"response_time", + "namespace":"dist-test", + "points": [6530], + "tags":["status:200","method:GET"] + }, + { + "common":true, + "metric": "request_size", + "namespace":"dist-test-2", + "points":[1843,4135] + }, + { + "common": false, + "metric":"response_time", + "namespace":"dist-test", + "points":[128,42,3000] } - } + ])"); + + CHECK(distribution_series == expected_series); // Make sure the next heartbeat doesn't contains distributions if no // datapoint has been added to a distribution. @@ -557,6 +541,81 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { CHECK(payload["request_type"] == "app-heartbeat"); } + SECTION("dtor sends metrics and distributions") { + // metrics captured before the aggregation task + const Clock clock = [] { + TimePoint result; + result.wall = std::chrono::system_clock::from_time_t(1744706125); + return result; + }; + const Distribution response_time{"response_time", "dist-test", false}; + const Rate rps{"request", "rate-test", true}; + const Counter my_counter{"my_counter", "counter-test", true}; + { + Telemetry telemetry{*finalize_config(), logger, client, + scheduler, *url, clock}; + telemetry.increment_counter(my_counter); // = 1 + telemetry.add_datapoint(response_time, 128); + telemetry.set_rate(rps, 1000); + client->clear(); + } + + // Expect 2 metrics with 1 datapoint each and 1 ditribution + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 3); + + for (const auto& payload : message_batch["payload"]) { + const auto& req_type = payload["request_type"]; + if (req_type == "generate-metrics") { + const auto& metrics_series = payload["payload"]["series"]; + REQUIRE(metrics_series.size() == 2); + + for (const auto& s : metrics_series) { + if (s["metric"] == "my_counter") { + const auto expected_counter = nlohmann::json::parse(R"( + { + "common":true, + "metric":"my_counter", + "namespace":"counter-test", + "type": "count", + "points": [[1744706125, 1]] + } + )"); + CHECK(s == expected_counter); + } else if (s["metric"] == "request") { + const auto expected_rate = nlohmann::json::parse(R"( + { + "common":true, + "metric":"request", + "namespace":"rate-test", + "type": "rate", + "points": [[1744706125, 1000]] + } + )"); + CHECK(s == expected_rate); + } else { + FAIL("unexpected metrics name, got " << s["metric"]); + } + } + } else if (req_type == "distributions") { + const auto& distribution_series = payload["payload"]["series"]; + REQUIRE(distribution_series.size() == 1); + + const auto& d0 = distribution_series[0]; + const auto expected_d0 = nlohmann::json::parse(R"( + { + "common":false, + "metric":"response_time", + "namespace":"dist-test", + "points": [128] + } + )"); + CHECK(d0 == expected_d0); + } + } + } + SECTION("logs serialization") { SECTION("log level is correct") { struct TestCase { From 61287d9cb10472ea36bd27862b709fe96d14641e Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 11:05:21 +0200 Subject: [PATCH 3/9] add counter test --- test/telemetry/test_telemetry.cpp | 37 +++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index d408fa9c..e76b10b6 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -404,6 +404,43 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { CHECK(payload2["request_type"] == "app-heartbeat"); } + SECTION("counters can't go below zero") { + client->clear(); + const Counter positive_counter{"positive_counter", "counter-test2", true}; + telemetry.decrement_counter(positive_counter); // = 0 + telemetry.decrement_counter(positive_counter); // = 0 + telemetry.decrement_counter(positive_counter); // = 0 + + scheduler->trigger_metrics_capture(); + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; + + auto series = payload["series"]; + REQUIRE(series.size() == 1); + + const auto expected_metrics = nlohmann::json::parse(R"( + [ + { + "common": true, + "metric": "positive_counter", + "namespace": "counter-test2", + "points": [ + [ 1672484400, 0 ] + ], + "type": "count" + } + ] + )"); + CHECK(series == expected_metrics); + } + SECTION("rate") { client->clear(); From 7a0f11b85540635ca020aebf99e65748fe3f4a10 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 11:09:29 +0200 Subject: [PATCH 4/9] fix typo --- include/datadog/telemetry/telemetry.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/datadog/telemetry/telemetry.h b/include/datadog/telemetry/telemetry.h index 9df22ff7..f8ae4ed6 100644 --- a/include/datadog/telemetry/telemetry.h +++ b/include/datadog/telemetry/telemetry.h @@ -74,7 +74,7 @@ void report_error_log(std::string message); void report_error_log(std::string message, std::string stacktrace); /// The `counter` namespace provides functions to track values. -/// Counters can be useful for tracking the total number of an event occuring in +/// Counters can be useful for tracking the total number of an event occurring in /// one time interval. For example, the amount of requests, errors or jobs /// processed every 10 seconds. namespace counter { @@ -118,7 +118,7 @@ void set(const Counter& counter, const std::vector& tags, } // namespace counter /// The `rate` namespace provides support for rate metrics-values. -/// Rates can be useful for tracking the total number of an event occurences in +/// Rates can be useful for tracking the total number of an event occurrences in /// one time interval. For example, the number of requests per second. namespace rate { From 292e080e739ff0d1576537f7f81820ee8f42ece6 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 11:45:18 +0200 Subject: [PATCH 5/9] fix json equality --- test/telemetry/test_telemetry.cpp | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index e76b10b6..4254d97d 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -389,7 +389,14 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ] )"); - CHECK(series == expected_metrics); + + for (const auto& s : series) { + if (s.contains("tags")) { + CHECK(s == expected_metrics[0]); + } else { + CHECK(s == expected_metrics[1]); + } + } // Make sure the next heartbeat doesn't contains counters if no // datapoint has been incremented, decremented or set. @@ -438,7 +445,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ] )"); - CHECK(series == expected_metrics); + CHECK(is_same_json(series, expected_metrics)); } SECTION("rate") { @@ -494,7 +501,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ] )"); - CHECK(series == expected_metrics); + CHECK(is_same_json(series, expected_metrics)); // Make sure the next heartbeat doesn't contains distributions if no // datapoint has been added to a distribution. @@ -563,7 +570,19 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ])"); - CHECK(distribution_series == expected_series); + for (const auto& s : distribution_series) { + if (s["metric"] == "response_time") { + if (s.contains("tags")) { + CHECK(s == expected_series[0]); + } else { + CHECK(s == expected_series[2]); + } + } else if (s["metric"] == "request_size") { + CHECK(s == expected_series[1]); + } else { + FAIL(); + } + } // Make sure the next heartbeat doesn't contains distributions if no // datapoint has been added to a distribution. From 068446509e82d0fc0d8efdd828c08a226e12b899 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 11:53:02 +0200 Subject: [PATCH 6/9] fix json ordering 2 --- test/telemetry/test_telemetry.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 4254d97d..01bcf6b5 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -445,7 +445,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ] )"); - CHECK(is_same_json(series, expected_metrics)); + CHECK(series == expected_metrics); } SECTION("rate") { @@ -501,7 +501,14 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { } ] )"); - CHECK(is_same_json(series, expected_metrics)); + + for (const auto& s : series) { + if (s.contains("tags")) { + CHECK(s == expected_metrics[0]); + } else { + CHECK(s == expected_metrics[1]); + } + } // Make sure the next heartbeat doesn't contains distributions if no // datapoint has been added to a distribution. From b39d94818320c9ba3f72eb6942732a90fe58e7c9 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 12:01:44 +0200 Subject: [PATCH 7/9] fix windows and formating --- include/datadog/telemetry/telemetry.h | 6 +++--- test/telemetry/test_telemetry.cpp | 14 ++++---------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/include/datadog/telemetry/telemetry.h b/include/datadog/telemetry/telemetry.h index f8ae4ed6..9828aa2f 100644 --- a/include/datadog/telemetry/telemetry.h +++ b/include/datadog/telemetry/telemetry.h @@ -74,8 +74,8 @@ void report_error_log(std::string message); void report_error_log(std::string message, std::string stacktrace); /// The `counter` namespace provides functions to track values. -/// Counters can be useful for tracking the total number of an event occurring in -/// one time interval. For example, the amount of requests, errors or jobs +/// Counters can be useful for tracking the total number of an event occurring +/// in one time interval. For example, the amount of requests, errors or jobs /// processed every 10 seconds. namespace counter { @@ -118,7 +118,7 @@ void set(const Counter& counter, const std::vector& tags, } // namespace counter /// The `rate` namespace provides support for rate metrics-values. -/// Rates can be useful for tracking the total number of an event occurrences in +/// Rates can be useful for tracking the total number of an event occurrences in /// one time interval. For example, the number of requests per second. namespace rate { diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 01bcf6b5..4bd1ee3f 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -294,10 +294,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { } TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { - const std::time_t mock_time = 1672484400; - const Clock clock = [&mock_time]() { + const Clock clock = [] { TimePoint result; - result.wall = std::chrono::system_clock::from_time_t(mock_time); + result.wall = std::chrono::system_clock::from_time_t(1672484400); return result; }; @@ -606,11 +605,6 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { SECTION("dtor sends metrics and distributions") { // metrics captured before the aggregation task - const Clock clock = [] { - TimePoint result; - result.wall = std::chrono::system_clock::from_time_t(1744706125); - return result; - }; const Distribution response_time{"response_time", "dist-test", false}; const Rate rps{"request", "rate-test", true}; const Counter my_counter{"my_counter", "counter-test", true}; @@ -642,7 +636,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { "metric":"my_counter", "namespace":"counter-test", "type": "count", - "points": [[1744706125, 1]] + "points": [[1672484400, 1]] } )"); CHECK(s == expected_counter); @@ -653,7 +647,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { "metric":"request", "namespace":"rate-test", "type": "rate", - "points": [[1744706125, 1000]] + "points": [[1672484400, 1000]] } )"); CHECK(s == expected_rate); From f686f992c22ac9e94890d7231c310478e6cba246 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 15 Apr 2025 12:23:32 +0200 Subject: [PATCH 8/9] fix windows 2 --- test/telemetry/test_telemetry.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 4bd1ee3f..a71a3250 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -609,11 +609,11 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { const Rate rps{"request", "rate-test", true}; const Counter my_counter{"my_counter", "counter-test", true}; { - Telemetry telemetry{*finalize_config(), logger, client, - scheduler, *url, clock}; - telemetry.increment_counter(my_counter); // = 1 - telemetry.add_datapoint(response_time, 128); - telemetry.set_rate(rps, 1000); + Telemetry tmp_telemetry{*finalize_config(), logger, client, + scheduler, *url, clock}; + tmp_telemetry.increment_counter(my_counter); // = 1 + tmp_telemetry.add_datapoint(response_time, 128); + tmp_telemetry.set_rate(rps, 1000); client->clear(); } From c6e60dc6d96f0be409a39aae133b81ade00c0756 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Mon, 21 Apr 2025 17:13:07 +0200 Subject: [PATCH 9/9] Apply suggestions from code review Co-authored-by: pablomartinezbernardo <134320516+pablomartinezbernardo@users.noreply.github.com> --- src/datadog/telemetry/telemetry_impl.h | 3 --- src/datadog/tracer.cpp | 6 +++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index cd077f49..ef72197a 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -18,9 +18,6 @@ namespace datadog::telemetry { -// This uses a reference_wrapper so references to internal metric values can -// be captured, and be iterated trivially when the values need to be -// snapshotted and published in telemetry messages. using MetricSnapshot = std::vector>; /// The telemetry class is responsible for handling internal telemetry data to diff --git a/src/datadog/tracer.cpp b/src/datadog/tracer.cpp index 14eb0ad9..1e2e9e17 100644 --- a/src/datadog/tracer.cpp +++ b/src/datadog/tracer.cpp @@ -382,7 +382,7 @@ Expected Tracer::extract_span(const DictReader& reader, const auto span_data_ptr = span_data.get(); telemetry::counter::increment(metrics::tracer::trace_segments_created, - {"new_continued:new"}); + {"new_continued:continued"}); const auto segment = std::make_shared( logger_, collector_, config_manager_->trace_sampler(), span_sampler_, config_manager_->span_defaults(), config_manager_, runtime_id_, @@ -445,11 +445,11 @@ Expected Tracer::inject(const Baggage& baggage, DictWriter& writer) { if (err->code == Error::Code::BAGGAGE_MAXIMUM_BYTES_REACHED) { telemetry::counter::increment( metrics::tracer::context_header_truncated, - {"truncation_reason:baggage_item_count_exceeded"}); + {"truncation_reason:baggage_byte_count_exceeded"}); } else if (err->code == Error::Code::BAGGAGE_MAXIMUM_ITEMS_REACHED) { telemetry::counter::increment( metrics::tracer::context_header_truncated, - {"truncation_reason:baggage_byte_count_exceeded"}); + {"truncation_reason:baggage_item_count_exceeded"}); } }