diff --git a/BUILD.bazel b/BUILD.bazel index eb59ad4f..a95c9d9d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -1,12 +1,13 @@ cc_library( name = "dd_trace_cpp", srcs = [ + "src/datadog/common/hash.cpp", "src/datadog/telemetry/configuration.cpp", - "src/datadog/telemetry/metrics.cpp", "src/datadog/telemetry/log.h", "src/datadog/telemetry/telemetry.cpp", "src/datadog/telemetry/telemetry_impl.h", "src/datadog/telemetry/telemetry_impl.cpp", + "src/datadog/telemetry/metric_context.h", "src/datadog/baggage.cpp", "src/datadog/base64.cpp", "src/datadog/cerr_logger.cpp", @@ -41,6 +42,7 @@ cc_library( "src/datadog/string_util.cpp", "src/datadog/tag_propagation.cpp", "src/datadog/tags.cpp", + "src/datadog/telemetry_metrics.cpp", "src/datadog/threaded_event_scheduler.cpp", "src/datadog/tracer_config.cpp", "src/datadog/tracer.cpp", @@ -50,6 +52,7 @@ cc_library( "src/datadog/trace_segment.cpp", "src/datadog/version.cpp", "src/datadog/w3c_propagation.cpp", + "src/datadog/common/hash.h", "src/datadog/base64.h", "src/datadog/config_manager.h", "src/datadog/collector_response.h", @@ -74,6 +77,7 @@ cc_library( "src/datadog/string_util.h", "src/datadog/tag_propagation.h", "src/datadog/tags.h", + "src/datadog/telemetry_metrics.h", "src/datadog/threaded_event_scheduler.h", "src/datadog/trace_sampler.h", "src/datadog/w3c_propagation.h", diff --git a/CMakeLists.txt b/CMakeLists.txt index e807b0a0..ecbf5612 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -106,8 +106,8 @@ target_sources(dd_trace_cpp-objects BASE_DIRS include FILES ${public_header_files} PRIVATE + src/datadog/common/hash.cpp src/datadog/telemetry/configuration.cpp - src/datadog/telemetry/metrics.cpp src/datadog/telemetry/telemetry.cpp src/datadog/telemetry/telemetry_impl.cpp src/datadog/baggage.cpp @@ -150,6 +150,7 @@ target_sources(dd_trace_cpp-objects src/datadog/trace_sampler_config.cpp src/datadog/trace_sampler.cpp src/datadog/trace_segment.cpp + src/datadog/telemetry_metrics.cpp src/datadog/version.cpp src/datadog/w3c_propagation.cpp ) diff --git a/include/datadog/telemetry/metrics.h b/include/datadog/telemetry/metrics.h index 45beb464..3cca54e3 100644 --- a/include/datadog/telemetry/metrics.h +++ b/include/datadog/telemetry/metrics.h @@ -1,146 +1,34 @@ #pragma once -// This component provides an interface, `Metric`, and specific classes for -// Counter and Gauge metrics. A metric has a name, type, and set of key:value -// tags associated with it. Metrics can be general to APM or language-specific. -// General metrics have `common` set to `true`, and language-specific metrics -// have `common` set to `false`. - -#include #include -#include namespace datadog { namespace telemetry { -class Metric { - // The name of the metric that will be published. A transformation occurs - // based on the name and whether it is "common" or "language-specific" when it - // is recorded. - std::string name_; - // The type of the metric. This will currently be count or gauge. - std::string type_; - // Namespace of the metric. - std::string scope_; - // Tags associated with this specific instance of the metric. - std::vector tags_; - // This affects the transformation of the metric name, where it can be a - // common telemetry metric, or a language-specific metric that is prefixed - // with the language name. - bool common_; - - protected: - std::atomic value_ = 0; - Metric(std::string name, std::string type, std::string scope, - std::vector tags, bool common); - - Metric(Metric&& rhs) - : name_(std::move(rhs.name_)), - type_(std::move(rhs.type_)), - scope_(std::move(rhs.scope_)), - tags_(std::move(rhs.tags_)) { - rhs.value_.store(value_.exchange(rhs.value_)); - } - - Metric& operator=(Metric&& rhs) { - if (&rhs != this) { - std::swap(name_, rhs.name_); - std::swap(type_, rhs.type_); - std::swap(scope_, rhs.scope_); - std::swap(tags_, rhs.tags_); - rhs.value_.store(value_.exchange(rhs.value_)); - } - return *this; - } - - public: - // Accessors for name, type, tags, common and capture_and_reset_value are used - // when producing the JSON message for reporting metrics. - std::string name(); - std::string type(); - std::string scope(); - std::vector tags(); - bool common(); - uint64_t value(); - uint64_t capture_and_reset_value(); -}; - -// A count metric is used for measuring activity, and has methods for adding a -// number of actions, or incrementing the current number of actions by 1. -class CounterMetric : public Metric { - public: - CounterMetric(std::string name, std::string scope, - std::vector tags, bool common); - void inc(); - void add(uint64_t amount); +namespace details { +enum class MetricType : char { counter, rate, distribution }; +} + +/// TODO: pre-compute hash? +template +struct Metric final { + /// The type of the metric. + static constexpr details::MetricType type = T; + /// The name of the metric that will be published. A transformation occurs + /// based on the name and whether it is "common" or "language-specific" when + /// it is recorded. + std::string name; + /// Namespace of the metric. + std::string scope; + /// This affects the transformation of the metric name, where it can be a + /// common telemetry metric, or a language-specific metric that is prefixed + /// with the language name. + bool common; }; -// A gauge metric is used for measuring state, and mas methods to set the -// current state, add or subtract from it, or increment/decrement the current -// state by 1. -class GaugeMetric : public Metric { - public: - GaugeMetric(std::string name, std::string scope, - std::vector tags, bool common); - void set(uint64_t value); - void inc(); - void add(uint64_t amount); - void dec(); - void sub(uint64_t amount); -}; - -// This structure contains all the metrics that are exposed by tracer -// telemetry. -struct DefaultMetrics { - struct { - telemetry::CounterMetric spans_created = { - "spans_created", "tracers", {}, true}; - telemetry::CounterMetric spans_finished = { - "spans_finished", "tracers", {}, true}; - - telemetry::CounterMetric trace_segments_created_new = { - "trace_segments_created", "tracers", {"new_continued:new"}, true}; - telemetry::CounterMetric trace_segments_created_continued = { - "trace_segments_created", "tracers", {"new_continued:continued"}, true}; - telemetry::CounterMetric trace_segments_closed = { - "trace_segments_closed", "tracers", {}, true}; - telemetry::CounterMetric baggage_items_exceeded = { - "context_header.truncated", - "tracers", - {{"truncation_reason:baggage_item_count_exceeded"}}, - true, - }; - telemetry::CounterMetric baggage_bytes_exceeded = { - "context_header.truncated", - "tracers", - {{"truncation_reason:baggage_byte_count_exceeded"}}, - true, - }; - } tracer; - struct { - telemetry::CounterMetric requests = { - "trace_api.requests", "tracers", {}, true}; - - telemetry::CounterMetric responses_1xx = { - "trace_api.responses", "tracers", {"status_code:1xx"}, true}; - telemetry::CounterMetric responses_2xx = { - "trace_api.responses", "tracers", {"status_code:2xx"}, true}; - telemetry::CounterMetric responses_3xx = { - "trace_api.responses", "tracers", {"status_code:3xx"}, true}; - telemetry::CounterMetric responses_4xx = { - "trace_api.responses", "tracers", {"status_code:4xx"}, true}; - telemetry::CounterMetric responses_5xx = { - "trace_api.responses", "tracers", {"status_code:5xx"}, true}; - - telemetry::CounterMetric errors_timeout = { - "trace_api.errors", "tracers", {"type:timeout"}, true}; - telemetry::CounterMetric errors_network = { - "trace_api.errors", "tracers", {"type:network"}, true}; - telemetry::CounterMetric errors_status_code = { - "trace_api.errors", "tracers", {"type:status_code"}, true}; - - } trace_api; -}; +using Counter = Metric; +using Rate = Metric; +using Distribution = Metric; } // namespace telemetry } // namespace datadog diff --git a/include/datadog/telemetry/telemetry.h b/include/datadog/telemetry/telemetry.h index b30f696f..9828aa2f 100644 --- a/include/datadog/telemetry/telemetry.h +++ b/include/datadog/telemetry/telemetry.h @@ -31,7 +31,6 @@ namespace datadog::telemetry { void init(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); @@ -58,10 +57,6 @@ void send_configuration_change(); void capture_configuration_change( const std::vector& new_configuration); -/// Provides access to the telemetry metrics for updating the values. -/// This value should not be stored. -DefaultMetrics& metrics(); - /// Report internal warning message to Datadog. /// /// @param message The warning message to log. @@ -78,4 +73,89 @@ void report_error_log(std::string message); /// @param stacktrace Stacktrace leading to the error. void report_error_log(std::string message, std::string stacktrace); +/// The `counter` namespace provides functions to track values. +/// Counters can be useful for tracking the total number of an event occurring +/// in one time interval. For example, the amount of requests, errors or jobs +/// processed every 10 seconds. +namespace counter { + +/// Increments the specified counter by 1. +/// +/// @param `counter` the counter to increment. +void increment(const Counter& counter); + +/// Increments the specified counter by 1. +/// +/// @param `counter` the counter to increment. +/// @param `tags` the distribution tags. +void increment(const Counter& counter, const std::vector& tags); + +/// Decrements the specified counter by 1. +/// +/// @param `counter` the counter to decrement. +void decrement(const Counter& counter); + +/// Decrements the specified counter by 1. +/// +/// @param `counter` the counter to decrement. +/// @param `tags` the distribution tags. +void decrement(const Counter& counter, const std::vector& tags); + +/// Sets the counter to a specific value. +/// +/// @param `counter` the counter to update. +/// @param `value` the value to assign to the counter. +void set(const Counter& counter, uint64_t value); + +/// Sets the counter to a specific value. +/// +/// @param `counter` the counter to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to assign to the counter. +void set(const Counter& counter, const std::vector& tags, + uint64_t value); + +} // namespace counter + +/// The `rate` namespace provides support for rate metrics-values. +/// Rates can be useful for tracking the total number of an event occurrences in +/// one time interval. For example, the number of requests per second. +namespace rate { + +/// Sets the rate to a specific value. +/// +/// @param `rate` the rate to update. +/// @param `value` the value to assign to the counter. +void set(const Rate& rate, uint64_t value); + +/// Sets the rate to a specific value. +/// +/// @param `rate` the rate to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to assign to the counter. +void set(const Rate& rate, const std::vector&, uint64_t value); + +} // namespace rate + +/// The `distribution` namespace provides support for statistical distribution. +/// Distribution can be useful for tracking things like response times or +/// payload sizes. +namespace distribution { + +/// Adds a value to the distribution. +/// +/// @param `distribution` the distribution to update. +/// @param `value` the value to add to the distribution. +void add(const Distribution& distribution, uint64_t value); + +/// Adds a value to the distribution. +/// +/// @param `distribution` the distribution to update. +/// @param `tags` the distribution tags. +/// @param `value` the value to add to the distribution. +void add(const Distribution& distribution, const std::vector& tags, + uint64_t value); + +} // namespace distribution + } // namespace datadog::telemetry diff --git a/src/datadog/common/hash.cpp b/src/datadog/common/hash.cpp new file mode 100644 index 00000000..dcf5736f --- /dev/null +++ b/src/datadog/common/hash.cpp @@ -0,0 +1,317 @@ +#include "hash.h" + +#include + +#if _MSC_VER +#pragma warning(push) +#pragma warning(disable : 4267) +#endif + +constexpr bool ALLOW_UNALIGNED_READS = true; + +namespace datadog::common { +namespace details { + +void SpookyHash::Short(const void *message, size_t length, uint64 *hash1, + uint64 *hash2) { + uint64 buf[2 * sc_numVars]; + union { + const uint8 *p8; + uint32 *p32; + uint64 *p64; + size_t i; + } u; + + u.p8 = (const uint8 *)message; + + if constexpr (!ALLOW_UNALIGNED_READS && (u.i & 0x7)) { + memcpy(buf, message, length); + u.p64 = buf; + } + + size_t remainder = length % 32; + uint64 a = *hash1; + uint64 b = *hash2; + uint64 c = sc_const; + uint64 d = sc_const; + + if (length > 15) { + const uint64 *end = u.p64 + (length / 32) * 4; + + // handle all complete sets of 32 bytes + for (; u.p64 < end; u.p64 += 4) { + c += u.p64[0]; + d += u.p64[1]; + ShortMix(a, b, c, d); + a += u.p64[2]; + b += u.p64[3]; + } + + // Handle the case of 16+ remaining bytes. + if (remainder >= 16) { + c += u.p64[0]; + d += u.p64[1]; + ShortMix(a, b, c, d); + u.p64 += 2; + remainder -= 16; + } + } + + // Handle the last 0..15 bytes, and its length + d += ((uint64)length) << 56; + switch (remainder) { + case 15: + d += ((uint64)u.p8[14]) << 48; + [[fallthrough]]; + case 14: + d += ((uint64)u.p8[13]) << 40; + [[fallthrough]]; + case 13: + d += ((uint64)u.p8[12]) << 32; + [[fallthrough]]; + case 12: + d += u.p32[2]; + c += u.p64[0]; + break; + case 11: + d += ((uint64)u.p8[10]) << 16; + [[fallthrough]]; + case 10: + d += ((uint64)u.p8[9]) << 8; + [[fallthrough]]; + case 9: + d += (uint64)u.p8[8]; + [[fallthrough]]; + case 8: + c += u.p64[0]; + break; + case 7: + c += ((uint64)u.p8[6]) << 48; + [[fallthrough]]; + case 6: + c += ((uint64)u.p8[5]) << 40; + [[fallthrough]]; + case 5: + c += ((uint64)u.p8[4]) << 32; + [[fallthrough]]; + case 4: + c += u.p32[0]; + break; + case 3: + c += ((uint64)u.p8[2]) << 16; + [[fallthrough]]; + case 2: + c += ((uint64)u.p8[1]) << 8; + [[fallthrough]]; + case 1: + c += (uint64)u.p8[0]; + break; + case 0: + c += sc_const; + d += sc_const; + } + ShortEnd(a, b, c, d); + *hash1 = a; + *hash2 = b; +} + +// do the whole hash in one call +void SpookyHash::Hash128(const void *message, size_t length, uint64 *hash1, + uint64 *hash2) { + if (length < sc_bufSize) { + Short(message, length, hash1, hash2); + return; + } + + uint64 h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11; + uint64 buf[sc_numVars]; + uint64 *end; + union { + const uint8 *p8; + uint64 *p64; + size_t i; + } u; + size_t remainder; + + h0 = h3 = h6 = h9 = *hash1; + h1 = h4 = h7 = h10 = *hash2; + h2 = h5 = h8 = h11 = sc_const; + + u.p8 = (const uint8 *)message; + end = u.p64 + (length / sc_blockSize) * sc_numVars; + + // handle all whole sc_blockSize blocks of bytes + if constexpr (ALLOW_UNALIGNED_READS || ((u.i & 0x7) == 0)) { + while (u.p64 < end) { + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } else { + while (u.p64 < end) { + memcpy(buf, u.p64, sc_blockSize); + Mix(buf, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } + + // handle the last partial block of sc_blockSize bytes + remainder = (length - ((const uint8 *)end - (const uint8 *)message)); + memcpy(buf, end, remainder); + memset(((uint8 *)buf) + remainder, 0, sc_blockSize - remainder); + ((uint8 *)buf)[sc_blockSize - 1] = remainder; + + // do some final mixing + End(buf, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + *hash1 = h0; + *hash2 = h1; +} + +// init spooky state +void SpookyHash::Init(uint64 seed1, uint64 seed2) { + m_length = 0; + m_remainder = 0; + m_state[0] = seed1; + m_state[1] = seed2; +} + +// add a message fragment to the state +void SpookyHash::Update(const void *message, size_t length) { + uint64 h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11; + size_t newLength = length + m_remainder; + uint8 remainder; + union { + const uint8 *p8; + uint64 *p64; + size_t i; + } u; + const uint64 *end; + + // Is this message fragment too short? If it is, stuff it away. + if (newLength < sc_bufSize) { + memcpy(&((uint8 *)m_data)[m_remainder], message, length); + m_length = length + m_length; + m_remainder = (uint8)newLength; + return; + } + + // init the variables + if (m_length < sc_bufSize) { + h0 = h3 = h6 = h9 = m_state[0]; + h1 = h4 = h7 = h10 = m_state[1]; + h2 = h5 = h8 = h11 = sc_const; + } else { + h0 = m_state[0]; + h1 = m_state[1]; + h2 = m_state[2]; + h3 = m_state[3]; + h4 = m_state[4]; + h5 = m_state[5]; + h6 = m_state[6]; + h7 = m_state[7]; + h8 = m_state[8]; + h9 = m_state[9]; + h10 = m_state[10]; + h11 = m_state[11]; + } + m_length = length + m_length; + + // if we've got anything stuffed away, use it now + if (m_remainder) { + uint8 prefix = sc_bufSize - m_remainder; + memcpy(&(((uint8 *)m_data)[m_remainder]), message, prefix); + u.p64 = m_data; + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + Mix(&u.p64[sc_numVars], h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p8 = ((const uint8 *)message) + prefix; + length -= prefix; + } else { + u.p8 = (const uint8 *)message; + } + + // handle all whole blocks of sc_blockSize bytes + end = u.p64 + (length / sc_blockSize) * sc_numVars; + remainder = (uint8)(length - ((const uint8 *)end - u.p8)); + if constexpr (ALLOW_UNALIGNED_READS || (u.i & 0x7) == 0) { + while (u.p64 < end) { + Mix(u.p64, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } else { + while (u.p64 < end) { + memcpy(m_data, u.p8, sc_blockSize); + Mix(m_data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + u.p64 += sc_numVars; + } + } + + // stuff away the last few bytes + m_remainder = remainder; + memcpy(m_data, end, remainder); + + // stuff away the variables + m_state[0] = h0; + m_state[1] = h1; + m_state[2] = h2; + m_state[3] = h3; + m_state[4] = h4; + m_state[5] = h5; + m_state[6] = h6; + m_state[7] = h7; + m_state[8] = h8; + m_state[9] = h9; + m_state[10] = h10; + m_state[11] = h11; +} + +// report the hash for the concatenation of all message fragments so far +void SpookyHash::Final(uint64 *hash1, uint64 *hash2) { + // init the variables + if (m_length < sc_bufSize) { + *hash1 = m_state[0]; + *hash2 = m_state[1]; + Short(m_data, m_length, hash1, hash2); + return; + } + + const uint64 *data = (const uint64 *)m_data; + uint8 remainder = m_remainder; + + uint64 h0 = m_state[0]; + uint64 h1 = m_state[1]; + uint64 h2 = m_state[2]; + uint64 h3 = m_state[3]; + uint64 h4 = m_state[4]; + uint64 h5 = m_state[5]; + uint64 h6 = m_state[6]; + uint64 h7 = m_state[7]; + uint64 h8 = m_state[8]; + uint64 h9 = m_state[9]; + uint64 h10 = m_state[10]; + uint64 h11 = m_state[11]; + + if (remainder >= sc_blockSize) { + // m_data can contain two blocks; handle any whole first block + Mix(data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + data += sc_numVars; + remainder -= sc_blockSize; + } + + // mix in the last partial block, and the length mod sc_blockSize + memset(&((uint8 *)data)[remainder], 0, (sc_blockSize - remainder)); + + ((uint8 *)data)[sc_blockSize - 1] = remainder; + + // do some final mixing + End(data, h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + + *hash1 = h0; + *hash2 = h1; +} + +} // namespace details + +} // namespace datadog::common + +#if _MSC_VER +#pragma warning(pop) +#endif diff --git a/src/datadog/common/hash.h b/src/datadog/common/hash.h new file mode 100644 index 00000000..3d2155d8 --- /dev/null +++ b/src/datadog/common/hash.h @@ -0,0 +1,448 @@ +#pragma once + +#include + +// clang-format off +#ifdef _MSC_VER +# define INLINE __forceinline + typedef unsigned __int64 uint64; + typedef unsigned __int32 uint32; + typedef unsigned __int16 uint16; + typedef unsigned __int8 uint8; +#else +# define INLINE inline +# include + typedef uint64_t uint64; + typedef uint32_t uint32; + typedef uint16_t uint16; + typedef uint8_t uint8; +#endif +// clang-format on + +namespace datadog::common { +namespace details { +// +// SpookyHash: a 128-bit noncryptographic hash function +// By Bob Jenkins, public domain +// Oct 31 2010: alpha, framework + SpookyHash::Mix appears right +// Oct 31 2011: alpha again, Mix only good to 2^^69 but rest appears right +// Dec 31 2011: beta, improved Mix, tested it for 2-bit deltas +// Feb 2 2012: production, same bits as beta +// Feb 5 2012: adjusted definitions of uint* to be more portable +// Mar 30 2012: 3 bytes/cycle, not 4. Alpha was 4 but wasn't thorough enough. +// August 5 2012: SpookyV2 (different results) +// +// Up to 3 bytes/cycle for long messages. Reasonably fast for short messages. +// All 1 or 2 bit deltas achieve avalanche within 1% bias per output bit. +// +// This was developed for and tested on 64-bit x86-compatible processors. +// It assumes the processor is little-endian. There is a macro +// controlling whether unaligned reads are allowed (by default they are). +// This should be an equally good hash on big-endian machines, but it will +// compute different results on them than on little-endian machines. +// +// Google's CityHash has similar specs to SpookyHash, and CityHash is faster +// on new Intel boxes. MD4 and MD5 also have similar specs, but they are orders +// of magnitude slower. CRCs are two or more times slower, but unlike +// SpookyHash, they have nice math for combining the CRCs of pieces to form +// the CRCs of wholes. There are also cryptographic hashes, but those are even +// slower than MD5. +// +// Source: +class SpookyHash { + public: + // + // SpookyHash: hash a single message in one call, produce 128-bit output + // + static void Hash128(const void *message, // message to hash + size_t length, // length of message in bytes + uint64 *hash1, // in/out: in seed 1, out hash value 1 + uint64 *hash2); // in/out: in seed 2, out hash value 2 + + // + // Hash64: hash a single message in one call, return 64-bit output + // + static uint64 Hash64(const void *message, // message to hash + size_t length, // length of message in bytes + uint64 seed) // seed + { + uint64 hash1 = seed; + Hash128(message, length, &hash1, &seed); + return hash1; + } + + // + // Hash32: hash a single message in one call, produce 32-bit output + // + static uint32 Hash32(const void *message, // message to hash + size_t length, // length of message in bytes + uint32 seed) // seed + { + uint64 hash1 = seed, hash2 = seed; + Hash128(message, length, &hash1, &hash2); + return (uint32)hash1; + } + + // + // Init: initialize the context of a SpookyHash + // + void Init(uint64 seed1, // any 64-bit value will do, including 0 + uint64 seed2); // different seeds produce independent hashes + + // + // Update: add a piece of a message to a SpookyHash state + // + void Update(const void *message, // message fragment + size_t length); // length of message fragment in bytes + + // + // Final: compute the hash for the current SpookyHash state + // + // This does not modify the state; you can keep updating it afterward + // + // The result is the same as if SpookyHash() had been called with + // all the pieces concatenated into one message. + // + void Final(uint64 *hash1, // out only: first 64 bits of hash value. + uint64 *hash2); // out only: second 64 bits of hash value. + + // + // left rotate a 64-bit value by k bytes + // + static INLINE uint64 Rot64(uint64 x, int k) { + return (x << k) | (x >> (64 - k)); + } + + // + // This is used if the input is 96 bytes long or longer. + // + // The internal state is fully overwritten every 96 bytes. + // Every input bit appears to cause at least 128 bits of entropy + // before 96 other bytes are combined, when run forward or backward + // For every input bit, + // Two inputs differing in just that input bit + // Where "differ" means xor or subtraction + // And the base value is random + // When run forward or backwards one Mix + // I tried 3 pairs of each; they all differed by at least 212 bits. + // + static INLINE void Mix(const uint64 *data, uint64 &s0, uint64 &s1, uint64 &s2, + uint64 &s3, uint64 &s4, uint64 &s5, uint64 &s6, + uint64 &s7, uint64 &s8, uint64 &s9, uint64 &s10, + uint64 &s11) { + s0 += data[0]; + s2 ^= s10; + s11 ^= s0; + s0 = Rot64(s0, 11); + s11 += s1; + s1 += data[1]; + s3 ^= s11; + s0 ^= s1; + s1 = Rot64(s1, 32); + s0 += s2; + s2 += data[2]; + s4 ^= s0; + s1 ^= s2; + s2 = Rot64(s2, 43); + s1 += s3; + s3 += data[3]; + s5 ^= s1; + s2 ^= s3; + s3 = Rot64(s3, 31); + s2 += s4; + s4 += data[4]; + s6 ^= s2; + s3 ^= s4; + s4 = Rot64(s4, 17); + s3 += s5; + s5 += data[5]; + s7 ^= s3; + s4 ^= s5; + s5 = Rot64(s5, 28); + s4 += s6; + s6 += data[6]; + s8 ^= s4; + s5 ^= s6; + s6 = Rot64(s6, 39); + s5 += s7; + s7 += data[7]; + s9 ^= s5; + s6 ^= s7; + s7 = Rot64(s7, 57); + s6 += s8; + s8 += data[8]; + s10 ^= s6; + s7 ^= s8; + s8 = Rot64(s8, 55); + s7 += s9; + s9 += data[9]; + s11 ^= s7; + s8 ^= s9; + s9 = Rot64(s9, 54); + s8 += s10; + s10 += data[10]; + s0 ^= s8; + s9 ^= s10; + s10 = Rot64(s10, 22); + s9 += s11; + s11 += data[11]; + s1 ^= s9; + s10 ^= s11; + s11 = Rot64(s11, 46); + s10 += s0; + } + + // + // Mix all 12 inputs together so that h0, h1 are a hash of them all. + // + // For two inputs differing in just the input bits + // Where "differ" means xor or subtraction + // And the base value is random, or a counting value starting at that bit + // The final result will have each bit of h0, h1 flip + // For every input bit, + // with probability 50 +- .3% + // For every pair of input bits, + // with probability 50 +- 3% + // + // This does not rely on the last Mix() call having already mixed some. + // Two iterations was almost good enough for a 64-bit result, but a + // 128-bit result is reported, so End() does three iterations. + // + static INLINE void EndPartial(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3, + uint64 &h4, uint64 &h5, uint64 &h6, uint64 &h7, + uint64 &h8, uint64 &h9, uint64 &h10, + uint64 &h11) { + h11 += h1; + h2 ^= h11; + h1 = Rot64(h1, 44); + h0 += h2; + h3 ^= h0; + h2 = Rot64(h2, 15); + h1 += h3; + h4 ^= h1; + h3 = Rot64(h3, 34); + h2 += h4; + h5 ^= h2; + h4 = Rot64(h4, 21); + h3 += h5; + h6 ^= h3; + h5 = Rot64(h5, 38); + h4 += h6; + h7 ^= h4; + h6 = Rot64(h6, 33); + h5 += h7; + h8 ^= h5; + h7 = Rot64(h7, 10); + h6 += h8; + h9 ^= h6; + h8 = Rot64(h8, 13); + h7 += h9; + h10 ^= h7; + h9 = Rot64(h9, 38); + h8 += h10; + h11 ^= h8; + h10 = Rot64(h10, 53); + h9 += h11; + h0 ^= h9; + h11 = Rot64(h11, 42); + h10 += h0; + h1 ^= h10; + h0 = Rot64(h0, 54); + } + + static INLINE void End(const uint64 *data, uint64 &h0, uint64 &h1, uint64 &h2, + uint64 &h3, uint64 &h4, uint64 &h5, uint64 &h6, + uint64 &h7, uint64 &h8, uint64 &h9, uint64 &h10, + uint64 &h11) { + h0 += data[0]; + h1 += data[1]; + h2 += data[2]; + h3 += data[3]; + h4 += data[4]; + h5 += data[5]; + h6 += data[6]; + h7 += data[7]; + h8 += data[8]; + h9 += data[9]; + h10 += data[10]; + h11 += data[11]; + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + EndPartial(h0, h1, h2, h3, h4, h5, h6, h7, h8, h9, h10, h11); + } + + // + // The goal is for each bit of the input to expand into 128 bits of + // apparent entropy before it is fully overwritten. + // n trials both set and cleared at least m bits of h0 h1 h2 h3 + // n: 2 m: 29 + // n: 3 m: 46 + // n: 4 m: 57 + // n: 5 m: 107 + // n: 6 m: 146 + // n: 7 m: 152 + // when run forwards or backwards + // for all 1-bit and 2-bit diffs + // with diffs defined by either xor or subtraction + // with a base of all zeros plus a counter, or plus another bit, or random + // + static INLINE void ShortMix(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3) { + h2 = Rot64(h2, 50); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 52); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 30); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 41); + h1 += h2; + h3 ^= h1; + h2 = Rot64(h2, 54); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 48); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 38); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 37); + h1 += h2; + h3 ^= h1; + h2 = Rot64(h2, 62); + h2 += h3; + h0 ^= h2; + h3 = Rot64(h3, 34); + h3 += h0; + h1 ^= h3; + h0 = Rot64(h0, 5); + h0 += h1; + h2 ^= h0; + h1 = Rot64(h1, 36); + h1 += h2; + h3 ^= h1; + } + + // + // Mix all 4 inputs together so that h0, h1 are a hash of them all. + // + // For two inputs differing in just the input bits + // Where "differ" means xor or subtraction + // And the base value is random, or a counting value starting at that bit + // The final result will have each bit of h0, h1 flip + // For every input bit, + // with probability 50 +- .3% (it is probably better than that) + // For every pair of input bits, + // with probability 50 +- .75% (the worst case is approximately that) + // + static INLINE void ShortEnd(uint64 &h0, uint64 &h1, uint64 &h2, uint64 &h3) { + h3 ^= h2; + h2 = Rot64(h2, 15); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 52); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 26); + h1 += h0; + h2 ^= h1; + h1 = Rot64(h1, 51); + h2 += h1; + h3 ^= h2; + h2 = Rot64(h2, 28); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 9); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 47); + h1 += h0; + h2 ^= h1; + h1 = Rot64(h1, 54); + h2 += h1; + h3 ^= h2; + h2 = Rot64(h2, 32); + h3 += h2; + h0 ^= h3; + h3 = Rot64(h3, 25); + h0 += h3; + h1 ^= h0; + h0 = Rot64(h0, 63); + h1 += h0; + } + + private: + // + // Short is used for messages under 192 bytes in length + // Short has a low startup cost, the normal mode is good for long + // keys, the cost crossover is at about 192 bytes. The two modes were + // held to the same quality bar. + // + static void Short( + const void *message, // message (array of bytes, not necessarily aligned) + size_t length, // length of message (in bytes) + uint64 *hash1, // in/out: in the seed, out the hash value + uint64 *hash2); // in/out: in the seed, out the hash value + + // number of uint64's in internal state + static const size_t sc_numVars = 12; + + // size of the internal state + static const size_t sc_blockSize = sc_numVars * 8; + + // size of buffer of unhashed data, in bytes + static const size_t sc_bufSize = 2 * sc_blockSize; + + // + // sc_const: a constant which: + // * is not zero + // * is odd + // * is a not-very-regular mix of 1's and 0's + // * does not need any other special mathematical properties + // + static const uint64 sc_const = 0xdeadbeefdeadbeefULL; + + uint64 m_data[2 * sc_numVars]; // unhashed data, for partial messages + uint64 m_state[sc_numVars]; // internal state of the hash + size_t m_length; // total length of the input so far + uint8 m_remainder; // length of unhashed data stashed in m_data +}; +} // namespace details + +/// A fast non-cryptographic 64-bit hash wrapper based on SpookyHash. +/// +/// This class provides a convenient interface for incremental hashing using +/// the SpookyHash algorithm. +class FastHash final { + details::SpookyHash hasher_; + + public: + /// Constructs a new FastHash instance with a given seed. + /// + /// @param seed A 64-bit seed to initialize the hash state. + FastHash(uint64 seed) { hasher_.Init(seed, 1528); } + + /// Appends data to the hash. + /// + /// This method can be called multiple times to hash input data in chunks. + /// + /// @param data Pointer to the data to hash. + /// @param length Size of the data in bytes. + void append(const void *data, size_t length) { hasher_.Update(data, length); } + + /// Finalizes the hash computation and returns a 64-bit hash value. + /// + /// This method finalizes the internal state and returns the first + /// 64 bits of the 128-bit SpookyHash output. + /// + /// @return A 64-bit hash value. + uint64 final() { + uint64 res[2]; + hasher_.Final(&res[0], &res[1]); + return res[0]; + } +}; + +} // namespace datadog::common diff --git a/src/datadog/datadog_agent.cpp b/src/datadog/datadog_agent.cpp index af925797..e3fb611e 100644 --- a/src/datadog/datadog_agent.cpp +++ b/src/datadog/datadog_agent.cpp @@ -18,6 +18,7 @@ #include "json.hpp" #include "msgpack.h" #include "span_data.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" namespace datadog { @@ -252,15 +253,20 @@ void DatadogAgent::flush() { const DictReader& /*response_headers*/, std::string response_body) { if (response_status >= 500) { - telemetry::metrics().trace_api.responses_5xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:5xx"}); } else if (response_status >= 400) { - telemetry::metrics().trace_api.responses_4xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:4xx"}); } else if (response_status >= 300) { - telemetry::metrics().trace_api.responses_3xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:3xx"}); } else if (response_status >= 200) { - telemetry::metrics().trace_api.responses_2xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:2xx"}); } else if (response_status >= 100) { - telemetry::metrics().trace_api.responses_1xx.inc(); + telemetry::counter::increment(metrics::tracer::api::responses, + {"status_code:1xx"}); } if (response_status != 200) { logger->log_error([&](auto& stream) { @@ -298,12 +304,13 @@ void DatadogAgent::flush() { // request or retrieving the response. It's invoked // asynchronously. auto on_error = [logger = logger_](Error error) { - telemetry::metrics().trace_api.errors_network.inc(); + telemetry::counter::increment(metrics::tracer::api::errors, + {"type:network"}); logger->log_error(error.with_prefix( "Error occurred during HTTP request for submitting traces: ")); }; - telemetry::metrics().trace_api.requests.inc(); + telemetry::counter::increment(metrics::tracer::api::requests); auto post_result = http_client_->post(traces_endpoint_, std::move(set_request_headers), std::move(body), std::move(on_response), diff --git a/src/datadog/telemetry/metric_context.h b/src/datadog/telemetry/metric_context.h new file mode 100644 index 00000000..f0a13387 --- /dev/null +++ b/src/datadog/telemetry/metric_context.h @@ -0,0 +1,67 @@ +#pragma once + +#include + +#include "common/hash.h" + +namespace datadog::telemetry { + +// TODO: `enable_if_t` is same `Metric` + +/// `MetricContext` is a unique identifier for metrics. +/// It depends on the kind of metric, its name, scope, common and the set of +/// tags. +template +struct MetricContext final { + /// The metric definition. + Metric id; + /// Set of tags. + std::vector tags; + + std::size_t hash() const { + common::FastHash h(3028); + h.append(id.name.data(), id.name.size()); + h.append(id.scope.data(), id.scope.size()); + h.append(&id.common, sizeof(id.common)); + for (const auto& t : tags) { + h.append(t.data(), t.size()); + } + return static_cast(h.final()); + } + + bool operator==(const MetricContext& rhs) const { + return id.name == rhs.id.name && id.scope == rhs.id.scope && + id.common == rhs.id.common && tags == rhs.tags; + } +}; + +} // namespace datadog::telemetry + +template <> +struct std::hash< + datadog::telemetry::MetricContext> { + std::size_t operator()( + const datadog::telemetry::MetricContext& + counter) const { + return counter.hash(); + } +}; + +template <> +struct std::hash> { + std::size_t operator()( + const datadog::telemetry::MetricContext& rate) + const { + return rate.hash(); + } +}; + +template <> +struct std::hash< + datadog::telemetry::MetricContext> { + std::size_t operator()( + const datadog::telemetry::MetricContext& + distribution) const { + return distribution.hash(); + } +}; diff --git a/src/datadog/telemetry/metrics.cpp b/src/datadog/telemetry/metrics.cpp deleted file mode 100644 index 8140cad3..00000000 --- a/src/datadog/telemetry/metrics.cpp +++ /dev/null @@ -1,43 +0,0 @@ -#include - -namespace datadog { -namespace telemetry { - -Metric::Metric(std::string name, std::string type, std::string scope, - std::vector tags, bool common) - : name_(std::move(name)), - type_(std::move(type)), - scope_(std::move(scope)), - tags_(std::move(tags)), - common_(common) {} -std::string Metric::name() { return name_; } -std::string Metric::type() { return type_; } -std::string Metric::scope() { return scope_; } -std::vector Metric::tags() { return tags_; } -bool Metric::common() { return common_; } -uint64_t Metric::value() { return value_; } -uint64_t Metric::capture_and_reset_value() { return value_.exchange(0); } - -CounterMetric::CounterMetric(std::string name, std::string scope, - std::vector tags, bool common) - : Metric(name, "count", scope, tags, common) {} -void CounterMetric::inc() { add(1); } -void CounterMetric::add(uint64_t amount) { value_ += amount; } - -GaugeMetric::GaugeMetric(std::string name, std::string scope, - std::vector tags, bool common) - : Metric(name, "gauge", scope, tags, common) {} -void GaugeMetric::set(uint64_t value) { value_ = value; } -void GaugeMetric::inc() { add(1); } -void GaugeMetric::add(uint64_t amount) { value_ += amount; } -void GaugeMetric::dec() { sub(1); } -void GaugeMetric::sub(uint64_t amount) { - if (amount > value_) { - value_ = 0; - } else { - value_ -= amount; - } -} - -} // namespace telemetry -} // namespace datadog diff --git a/src/datadog/telemetry/telemetry.cpp b/src/datadog/telemetry/telemetry.cpp index 2c03f6be..7661edf1 100644 --- a/src/datadog/telemetry/telemetry.cpp +++ b/src/datadog/telemetry/telemetry.cpp @@ -24,15 +24,11 @@ using NoopTelemetry = std::monostate; /// implementation. using TelemetryProxy = std::variant; -// NOTE(@dmehala): until metrics handling is improved. -static DefaultMetrics noop_metrics; - /// NOTE(@dmehala): Here to facilitate Meyer's singleton construction. struct Ctor_param final { FinalizedConfiguration configuration; std::shared_ptr logger; std::shared_ptr client; - std::vector> metrics; std::shared_ptr scheduler; tracing::HTTPClient::URL agent_url; tracing::Clock clock = tracing::default_clock; @@ -41,8 +37,7 @@ struct Ctor_param final { TelemetryProxy make_telemetry(const Ctor_param& init) { if (!init.configuration.enabled) return NoopTelemetry{}; return Telemetry{init.configuration, init.logger, init.client, - init.metrics, init.scheduler, init.agent_url, - init.clock}; + init.scheduler, init.agent_url, init.clock}; } TelemetryProxy& instance( @@ -54,11 +49,10 @@ TelemetryProxy& instance( void init(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock) { - instance(Ctor_param{configuration, logger, client, metrics, event_scheduler, - agent_url, clock}); + instance(Ctor_param{configuration, logger, client, event_scheduler, agent_url, + clock}); } void send_configuration_change() { @@ -81,15 +75,6 @@ void capture_configuration_change( instance()); } -DefaultMetrics& metrics() { - auto& proxy = instance(); - if (std::holds_alternative(proxy)) { - return noop_metrics; - } else { - return std::get(proxy).metrics(); - } -} - void report_warning_log(std::string message) { std::visit(details::Overload{ [&](Telemetry& telemetry) { telemetry.log_warning(message); }, @@ -116,4 +101,110 @@ void report_error_log(std::string message, std::string stacktrace) { instance()); } +namespace counter { +void increment(const Counter& counter) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.increment_counter(counter); }, + [](auto&&) {}, + }, + instance()); +} + +void increment(const Counter& counter, const std::vector& tags) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.increment_counter(counter, tags); + }, + [](auto&&) {}, + }, + instance()); +} + +void decrement(const Counter& counter) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.decrement_counter(counter); }, + [](auto&&) {}, + }, + instance()); +} + +void decrement(const Counter& counter, const std::vector& tags) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.decrement_counter(counter, tags); + }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Counter& counter, uint64_t value) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_counter(counter, value); }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Counter& counter, const std::vector& tags, + uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.set_counter(counter, tags, value); + }, + [](auto&&) {}, + }, + instance()); +} + +} // namespace counter + +namespace rate { +void set(const Rate& rate, uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_rate(rate, value); }, + [](auto&&) {}, + }, + instance()); +} + +void set(const Rate& rate, const std::vector& tags, + uint64_t value) { + std::visit( + details::Overload{ + [&](Telemetry& telemetry) { telemetry.set_rate(rate, tags, value); }, + [](auto&&) {}, + }, + instance()); +} +} // namespace rate + +namespace distribution { + +void add(const Distribution& distribution, uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.add_datapoint(distribution, value); + }, + [](auto&&) {}, + }, + instance()); +} + +void add(const Distribution& distribution, const std::vector& tags, + uint64_t value) { + std::visit(details::Overload{ + [&](Telemetry& telemetry) { + telemetry.add_datapoint(distribution, tags, value); + }, + [](auto&&) {}, + }, + instance()); +} + +} // namespace distribution + } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index ad6ec9c5..6e87ad38 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -88,12 +88,74 @@ nlohmann::json encode_log(const telemetry::LogMessage& log) { return encoded; } +std::string_view to_string(details::MetricType type) { + using namespace datadog::telemetry::details; + switch (type) { + case MetricType::counter: + return "count"; + case MetricType::rate: + return "rate"; + case MetricType::distribution: + return "distribution"; + } + + return ""; +} + +// TODO: do `enable_if` +template +void encode_metrics( + nlohmann::json::array_t& metrics, + const std::unordered_map, telemetry::MetricSnapshot>& + counters_snapshots) { + for (const auto& [metric_ctx, snapshots] : counters_snapshots) { + auto encoded = nlohmann::json{ + {"metric", metric_ctx.id.name}, + {"type", to_string(metric_ctx.id.type)}, + {"common", metric_ctx.id.common}, + {"namespace", metric_ctx.id.scope}, + }; + + if (!metric_ctx.tags.empty()) { + encoded.emplace("tags", metric_ctx.tags); + } + + auto points = nlohmann::json::array(); + for (const auto& [timestamp, value] : snapshots) { + points.emplace_back(nlohmann::json::array({timestamp, value})); + } + + encoded.emplace("points", points); + metrics.emplace_back(std::move(encoded)); + } +} + +nlohmann::json encode_distributions( + const std::unordered_map, + std::vector>& distributions) { + auto j = nlohmann::json::array(); + + for (const auto& [metric_ctx, values] : distributions) { + auto series = nlohmann::json{ + {"metric", metric_ctx.id.name}, + {"common", metric_ctx.id.common}, + {"namespace", metric_ctx.id.scope}, + {"points", values}, + }; + if (!metric_ctx.tags.empty()) { + series.emplace("tags", metric_ctx.tags); + } + j.emplace_back(series); + } + + return j; +} + } // namespace Telemetry::Telemetry(FinalizedConfiguration config, std::shared_ptr logger, std::shared_ptr client, - std::vector> user_metrics, std::shared_ptr event_scheduler, HTTPClient::URL agent_url, Clock clock) : config_(std::move(config)), @@ -104,44 +166,7 @@ Telemetry::Telemetry(FinalizedConfiguration config, http_client_(client), clock_(std::move(clock)), scheduler_(event_scheduler), - user_metrics_(std::move(user_metrics)), host_info_(get_host_info()) { - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - // Callback for successful telemetry HTTP requests, to examine HTTP // status. telemetry_on_response_ = [logger = logger_]( @@ -191,8 +216,7 @@ Telemetry::~Telemetry() { } Telemetry::Telemetry(Telemetry&& rhs) - : metrics_(std::move(rhs.metrics_)), - config_(std::move(rhs.config_)), + : config_(std::move(rhs.config_)), logger_(std::move(rhs.logger_)), telemetry_on_response_(std::move(rhs.telemetry_on_response_)), telemetry_on_error_(std::move(rhs.telemetry_on_error_)), @@ -201,56 +225,16 @@ Telemetry::Telemetry(Telemetry&& rhs) http_client_(rhs.http_client_), clock_(std::move(rhs.clock_)), scheduler_(std::move(rhs.scheduler_)), - user_metrics_(std::move(rhs.user_metrics_)), seq_id_(rhs.seq_id_), config_seq_ids_(rhs.config_seq_ids_), host_info_(rhs.host_info_) { cancel_tasks(rhs.tasks_); - - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - schedule_tasks(); } Telemetry& Telemetry::operator=(Telemetry&& rhs) { if (&rhs != this) { cancel_tasks(rhs.tasks_); - - std::swap(metrics_, rhs.metrics_); std::swap(config_, rhs.config_); std::swap(logger_, rhs.logger_); std::swap(telemetry_on_response_, rhs.telemetry_on_response_); @@ -260,47 +244,9 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(tracer_signature_, rhs.tracer_signature_); std::swap(http_client_, rhs.http_client_); std::swap(clock_, rhs.clock_); - std::swap(user_metrics_, rhs.user_metrics_); std::swap(seq_id_, rhs.seq_id_); std::swap(config_seq_ids_, rhs.config_seq_ids_); std::swap(host_info_, rhs.host_info_); - - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, - MetricSnapshot{}); - metrics_snapshots_.emplace_back( - metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.requests, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, - MetricSnapshot{}); - metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, - MetricSnapshot{}); - - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - schedule_tasks(); } return *this; @@ -379,37 +325,28 @@ std::string Telemetry::heartbeat_and_telemetry() { }); batch_payloads.emplace_back(std::move(heartbeat)); - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"namespace", metric.scope()}, - {"common", metric.common()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"namespace", metric.scope()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - })); - } - } - points.clear(); + std::unordered_map, std::vector> + distributions; + { + std::lock_guard l{distributions_mutex_}; + std::swap(distributions_, distributions); } + std::unordered_map, MetricSnapshot> counters_snapshot; + { + std::lock_guard l{counter_mutex_}; + std::swap(counters_snapshot_, counters_snapshot); + } + + std::unordered_map, MetricSnapshot> rates_snapshot; + { + std::lock_guard l{rate_mutex_}; + std::swap(rates_snapshot_, rates_snapshot); + } + + nlohmann::json::array_t metrics = nlohmann::json::array(); + encode_metrics(metrics, counters_snapshot); + encode_metrics(metrics, rates_snapshot); if (!metrics.empty()) { auto generate_metrics = nlohmann::json::object({ {"request_type", "generate-metrics"}, @@ -420,6 +357,20 @@ std::string Telemetry::heartbeat_and_telemetry() { batch_payloads.emplace_back(std::move(generate_metrics)); } + if (auto distributions_series = encode_distributions(distributions); + !distributions.empty()) { + auto distributions_json = nlohmann::json{ + {"request_type", "distributions"}, + { + "payload", + nlohmann::json{ + {"series", distributions_series}, + }, + }, + }; + batch_payloads.emplace_back(std::move(distributions_json)); + } + if (!logs_.empty()) { auto encoded_logs = nlohmann::json::array(); for (const auto& log : logs_) { @@ -455,37 +406,9 @@ std::string Telemetry::app_closing() { }); batch_payloads.emplace_back(std::move(app_closing)); - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } - } - points.clear(); - } - + nlohmann::json::array_t metrics = nlohmann::json::array(); + encode_metrics(metrics, counters_snapshot_); + encode_metrics(metrics, rates_snapshot_); if (!metrics.empty()) { auto generate_metrics = nlohmann::json::object({ {"request_type", "generate-metrics"}, @@ -496,6 +419,20 @@ std::string Telemetry::app_closing() { batch_payloads.emplace_back(std::move(generate_metrics)); } + if (auto distributions_series = encode_distributions(distributions_); + !distributions_.empty()) { + auto distributions_json = nlohmann::json{ + {"request_type", "distributions"}, + { + "payload", + nlohmann::json{ + {"series", distributions_series}, + }, + }, + }; + batch_payloads.emplace_back(std::move(distributions_json)); + } + if (!logs_.empty()) { auto encoded_logs = nlohmann::json::array(); for (const auto& log : logs_) { @@ -701,12 +638,27 @@ void Telemetry::capture_metrics() { std::time_t timepoint = std::chrono::duration_cast( clock_().wall.time_since_epoch()) .count(); - for (auto& m : metrics_snapshots_) { - auto value = m.first.get().capture_and_reset_value(); - if (value == 0) { - continue; - } - m.second.emplace_back(timepoint, value); + + std::unordered_map, uint64_t> counter_snapshot; + { + std::lock_guard l{counter_mutex_}; + std::swap(counter_snapshot, counters_); + } + + for (auto& [counter, value] : counter_snapshot) { + auto& counter_snapshots = counters_snapshot_[counter]; + counter_snapshots.emplace_back(std::make_pair(timepoint, value)); + } + + std::unordered_map, uint64_t> rate_snapshot; + { + std::lock_guard l{rate_mutex_}; + std::swap(rate_snapshot, rates_); + } + + for (auto& [rate, value] : rate_snapshot) { + auto& rates_snapshots = rates_snapshot_[rate]; + rates_snapshots.emplace_back(std::make_pair(timepoint, value)); } } @@ -719,4 +671,57 @@ void Telemetry::log(std::string message, telemetry::LogLevel level, telemetry::LogMessage{std::move(message), level, stacktrace, timestamp}); } +void Telemetry::increment_counter(const Counter& id) { + increment_counter(id, {}); +} + +void Telemetry::increment_counter(const Counter& id, + const std::vector& tags) { + std::lock_guard l{counter_mutex_}; + counters_[{id, tags}] += 1; +} + +void Telemetry::decrement_counter(const Counter& id) { + decrement_counter(id, {}); +} + +void Telemetry::decrement_counter(const Counter& id, + const std::vector& tags) { + std::lock_guard l{counter_mutex_}; + auto& v = counters_[{id, tags}]; + if (v > 0) v -= 1; +} + +void Telemetry::set_counter(const Counter& id, uint64_t value) { + set_counter(id, {}, value); +} + +void Telemetry::set_counter(const Counter& id, + const std::vector& tags, + uint64_t value) { + std::lock_guard l{counter_mutex_}; + counters_[{id, tags}] = value; +} + +void Telemetry::set_rate(const Rate& id, uint64_t value) { + set_rate(id, {}, value); +} + +void Telemetry::set_rate(const Rate& id, const std::vector& tags, + uint64_t value) { + std::lock_guard l{rate_mutex_}; + rates_[{id, tags}] = value; +} + +void Telemetry::add_datapoint(const Distribution& id, uint64_t value) { + add_datapoint(id, {}, value); +} + +void Telemetry::add_datapoint(const Distribution& id, + const std::vector& tags, + uint64_t value) { + std::lock_guard l{distributions_mutex_}; + distributions_[{id, tags}].emplace_back(value); +} + } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 33a4d586..ef72197a 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -9,16 +9,26 @@ #include #include +#include + #include "json.hpp" #include "log.h" +#include "metric_context.h" #include "platform_util.h" namespace datadog::telemetry { +using MetricSnapshot = std::vector>; + /// The telemetry class is responsible for handling internal telemetry data to /// track Datadog product usage. It _can_ collect and report logs and metrics. +/// +/// NOTE(@dmehala): The current implementation can lead a significant amount +/// of overhead if the mutext is highly disputed. Unless this is proven to be +/// indeed a bottleneck, I'll embrace KISS principle. However, in a future +/// iteration we could use multiple producer single consumer queue or +/// lock-free queue. class Telemetry final { - DefaultMetrics metrics_; /// Configuration object containing the validated settings for telemetry FinalizedConfiguration config_; /// Shared pointer to the user logger instance. @@ -32,15 +42,23 @@ class Telemetry final { tracing::Clock clock_; std::shared_ptr scheduler_; - // This uses a reference_wrapper so references to internal metric values can - // be captured, and be iterated trivially when the values need to be - // snapshotted and published in telemetry messages. - using MetricSnapshot = std::vector>; - std::vector< - std::pair, MetricSnapshot>> - metrics_snapshots_; - std::vector> user_metrics_; + /// Counter + std::mutex counter_mutex_; + std::unordered_map, uint64_t> counters_; + std::unordered_map, MetricSnapshot> counters_snapshot_; + + /// Rate + std::mutex rate_mutex_; + std::unordered_map, uint64_t> rates_; + std::unordered_map, MetricSnapshot> rates_snapshot_; + /// Distribution + /// TODO: split distribution in array of N element? + std::mutex distributions_mutex_; + std::unordered_map, std::vector> + distributions_; + + /// Configuration std::vector configuration_snapshot_; std::vector logs_; @@ -61,7 +79,6 @@ class Telemetry final { Telemetry(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> user_metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); @@ -75,10 +92,6 @@ class Telemetry final { Telemetry(Telemetry&& rhs); Telemetry& operator=(Telemetry&&); - // Provides access to the telemetry metrics for updating the values. - // This value should not be stored. - inline auto& metrics() { return metrics_; } - /// Capture and report internal error message to Datadog. /// /// @param message The error message. @@ -95,6 +108,27 @@ class Telemetry final { void capture_configuration_change( const std::vector& new_configuration); + /// Counter + void increment_counter(const Counter& counter); + void increment_counter(const Counter& counter, + const std::vector& tags); + void decrement_counter(const Counter& counter); + void decrement_counter(const Counter& counter, + const std::vector& tags); + void set_counter(const Counter& counter, uint64_t value); + void set_counter(const Counter& counter, const std::vector& tags, + uint64_t value); + + /// Rate + void set_rate(const Rate& rate, uint64_t value); + void set_rate(const Rate& rate, const std::vector& tags, + uint64_t value); + + /// Distribution + void add_datapoint(const Distribution& distribution, uint64_t value); + void add_datapoint(const Distribution& distribution, + const std::vector& tags, uint64_t value); + private: void send_telemetry(tracing::StringView request_type, std::string payload); diff --git a/src/datadog/telemetry_metrics.cpp b/src/datadog/telemetry_metrics.cpp new file mode 100644 index 00000000..ccf56db9 --- /dev/null +++ b/src/datadog/telemetry_metrics.cpp @@ -0,0 +1,27 @@ +#include "telemetry_metrics.h" + +namespace datadog::tracing::metrics { + +namespace tracer { +const telemetry::Counter spans_created = {"spans_created", "tracers", true}; +const telemetry::Counter spans_finished = {"spans_finished", "tracers", true}; + +const telemetry::Counter trace_segments_created = {"trace_segments_created", + "tracers", true}; + +const telemetry::Counter trace_segments_closed = {"trace_segments_closed", + "tracers", true}; +const telemetry::Counter context_header_truncated = { + "context_header.truncated", + "tracers", + true, +}; + +namespace api { +const telemetry::Counter requests = {"trace_api.requests", "tracers", true}; +const telemetry::Counter responses = {"trace_api.responses", "tracers", true}; +const telemetry::Counter errors = {"trace_api.errors", "tracers", true}; +} // namespace api +} // namespace tracer + +} // namespace datadog::tracing::metrics diff --git a/src/datadog/telemetry_metrics.h b/src/datadog/telemetry_metrics.h new file mode 100644 index 00000000..b3461475 --- /dev/null +++ b/src/datadog/telemetry_metrics.h @@ -0,0 +1,53 @@ +#pragma once + +#include + +namespace datadog::tracing::metrics { + +namespace tracer { + +/// The number of spans created by the tracer, tagged by manual API +/// (`integration_name:datadog`, `integration_name:otel` or +/// `integration_name:opentracing`). +extern const telemetry::Counter spans_created; + +/// The number of spans finished, optionally (if implementation allows) tagged +/// manual API (`integration_name:datadog`, `integration_name:otel` or +/// `integration_name:opentracing`). +extern const telemetry::Counter spans_finished; + +/// The number of trace segments (local traces) created, tagged with +/// new/continued depending on whether this is a new trace (no distributed +/// context information) or continued (has distributed context). +extern const telemetry::Counter trace_segments_created; + +/// The number of trace segments (local traces) closed. In non partial flush +/// scenarios, trace_segments_closed == trace_chunks_enqueued. +extern const telemetry::Counter trace_segments_closed; + +/// The number of times a context propagation header is truncated, tagged by the +/// reason for truncation (`truncation_reason:baggage_item_count_exceeded`, +/// `truncation_reason:baggage_byte_count_exceeded`). +extern const telemetry::Counter context_header_truncated; + +namespace api { + +/// The number of requests sent to the trace endpoint in the agent, regardless +/// of success. +extern const telemetry::Counter requests; + +/// The number of responses received from the trace endpoint, tagged with status +/// code, e.g. `status_code:200`, `status_code:404`. May also use +/// `status_code:5xx` for example as a catch-all for 2xx, 3xx, 4xx, 5xx +/// responses. +extern const telemetry::Counter responses; + +/// The number of requests sent to the trace endpoint in the agent that errored, +/// tagged by the error type (e.g. `type:timeout`, `type:network`, +/// `type:status_code`). +extern const telemetry::Counter errors; + +} // namespace api +} // namespace tracer + +} // namespace datadog::tracing::metrics diff --git a/src/datadog/trace_segment.cpp b/src/datadog/trace_segment.cpp index fcdcc060..44d0d1ee 100644 --- a/src/datadog/trace_segment.cpp +++ b/src/datadog/trace_segment.cpp @@ -16,16 +16,14 @@ #include #include -#include "collector_response.h" #include "config_manager.h" #include "hex.h" -#include "json.hpp" #include "platform_util.h" -#include "random.h" #include "span_data.h" #include "span_sampler.h" #include "tag_propagation.h" #include "tags.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" #include "w3c_propagation.h" @@ -142,7 +140,7 @@ Optional TraceSegment::sampling_decision() const { Logger& TraceSegment::logger() const { return *logger_; } void TraceSegment::register_span(std::unique_ptr span) { - telemetry::metrics().tracer.spans_created.inc(); + telemetry::counter::increment(metrics::tracer::spans_created); std::lock_guard lock(mutex_); assert(spans_.empty() || num_finished_spans_ < spans_.size()); @@ -151,7 +149,7 @@ void TraceSegment::register_span(std::unique_ptr span) { void TraceSegment::span_finished() { { - telemetry::metrics().tracer.spans_finished.inc(); + telemetry::counter::increment(metrics::tracer::spans_finished); std::lock_guard lock(mutex_); ++num_finished_spans_; assert(num_finished_spans_ <= spans_.size()); @@ -242,7 +240,7 @@ void TraceSegment::span_finished() { } } - telemetry::metrics().tracer.trace_segments_closed.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_closed); } void TraceSegment::override_sampling_priority(SamplingPriority priority) { diff --git a/src/datadog/tracer.cpp b/src/datadog/tracer.cpp index dac30f13..1e2e9e17 100644 --- a/src/datadog/tracer.cpp +++ b/src/datadog/tracer.cpp @@ -26,6 +26,7 @@ #include "span_data.h" #include "span_sampler.h" #include "tags.h" +#include "telemetry_metrics.h" #include "trace_sampler.h" #include "w3c_propagation.h" @@ -59,7 +60,6 @@ Tracer::Tracer(const FinalizedTracerConfig& config, baggage_injection_enabled_(false), baggage_extraction_enabled_(false) { telemetry::init(config.telemetry, logger_, config.http_client, - std::vector>{}, config.event_scheduler, config.agent_url); if (config.report_hostname) { hostname_ = get_hostname(); @@ -181,7 +181,8 @@ Span Tracer::create_span(const SpanConfig& config) { } const auto span_data_ptr = span_data.get(); - telemetry::metrics().tracer.trace_segments_created_new.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_created, + {"new_continued:new"}); const auto segment = std::make_shared( logger_, collector_, config_manager_->trace_sampler(), span_sampler_, defaults, config_manager_, runtime_id_, injection_styles_, hostname_, @@ -380,7 +381,8 @@ Expected Tracer::extract_span(const DictReader& reader, } const auto span_data_ptr = span_data.get(); - telemetry::metrics().tracer.trace_segments_created_continued.inc(); + telemetry::counter::increment(metrics::tracer::trace_segments_created, + {"new_continued:continued"}); const auto segment = std::make_shared( logger_, collector_, config_manager_->trace_sampler(), span_sampler_, config_manager_->span_defaults(), config_manager_, runtime_id_, @@ -441,9 +443,13 @@ Expected Tracer::inject(const Baggage& baggage, DictWriter& writer) { err->with_prefix("failed to serialize all baggage items: ")); if (err->code == Error::Code::BAGGAGE_MAXIMUM_BYTES_REACHED) { - telemetry::metrics().tracer.baggage_bytes_exceeded.inc(); + telemetry::counter::increment( + metrics::tracer::context_header_truncated, + {"truncation_reason:baggage_byte_count_exceeded"}); } else if (err->code == Error::Code::BAGGAGE_MAXIMUM_ITEMS_REACHED) { - telemetry::metrics().tracer.baggage_items_exceeded.inc(); + telemetry::counter::increment( + metrics::tracer::context_header_truncated, + {"truncation_reason:baggage_item_count_exceeded"}); } } diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index ab4f084b..ecc6aede 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -18,7 +18,6 @@ add_executable(tests # telemetry test cases telemetry/test_configuration.cpp - telemetry/test_metrics.cpp telemetry/test_telemetry.cpp # test cases diff --git a/test/telemetry/test_metrics.cpp b/test/telemetry/test_metrics.cpp deleted file mode 100644 index 3078c084..00000000 --- a/test/telemetry/test_metrics.cpp +++ /dev/null @@ -1,40 +0,0 @@ -// This test covers operations defined for metrics defined in `metrics.h`. - -#include - -#include "test.h" - -using namespace datadog::telemetry; - -#define TELEMETRY_METRICS_TEST(x) \ - TEST_CASE(x, "[telemetry],[telemetry.metrics]") - -TELEMETRY_METRICS_TEST("Counter metrics") { - CounterMetric metric = { - "test.counter.metric", "test_scope", {"testing-testing:123"}, true}; - - metric.inc(); - metric.add(41); - REQUIRE(metric.value() == 42); - auto captured_value = metric.capture_and_reset_value(); - REQUIRE(captured_value == 42); - REQUIRE(metric.value() == 0); -} - -TELEMETRY_METRICS_TEST("Gauge metrics") { - GaugeMetric metric = { - "test.gauge.metric", "test_scope", {"testing-testing:123"}, true}; - metric.set(40); - metric.inc(); - metric.add(10); - metric.sub(8); - metric.dec(); - REQUIRE(metric.value() == 42); - auto captured_value = metric.capture_and_reset_value(); - REQUIRE(captured_value == 42); - REQUIRE(metric.value() == 0); - - metric.add(10); - metric.sub(11); - REQUIRE(metric.value() == 0); -} diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index f3795475..a71a3250 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -89,12 +89,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("ctor send app-started message") { SECTION("Without a defined integration") { - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry{*finalize_config(), logger, client, scheduler, *url}; /// By default the integration is `datadog` with the tracer version. /// TODO: remove the default because these datadog are already part of the /// request header. @@ -114,11 +109,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.integration_name = "nginx"; cfg.integration_version = "1.25.2"; - Telemetry telemetry2{*finalize_config(cfg), - logger, - client, - std::vector>{}, - scheduler, + Telemetry telemetry2{*finalize_config(cfg), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); @@ -144,12 +135,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { ddtest::EnvGuard install_time_env("DD_INSTRUMENTATION_INSTALL_TIME", "1703188212"); - Telemetry telemetry4{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry4{*finalize_config(), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(app_started) == true); @@ -191,11 +177,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { Configuration cfg; cfg.products.emplace_back(std::move(product)); - Telemetry telemetry3{*finalize_config(cfg), - logger, - client, - std::vector>{}, - scheduler, + Telemetry telemetry3{*finalize_config(cfg), logger, client, scheduler, *url}; auto app_started = nlohmann::json::parse(client->request_body); @@ -299,12 +281,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { SECTION("dtor send app-closing message") { { - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url}; + Telemetry telemetry{*finalize_config(), logger, client, scheduler, *url}; client->clear(); } @@ -317,10 +294,9 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry lifecycle") { } TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { - const std::time_t mock_time = 1672484400; - const Clock clock = [&mock_time]() { + const Clock clock = [] { TimePoint result; - result.wall = std::chrono::system_clock::from_time_t(mock_time); + result.wall = std::chrono::system_clock::from_time_t(1672484400); return result; }; @@ -335,13 +311,8 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { auto url = HTTPClient::URL::parse("http://localhost:8000"); - Telemetry telemetry{*finalize_config(), - logger, - client, - std::vector>{}, - scheduler, - *url, - clock}; + Telemetry telemetry{*finalize_config(), logger, client, + scheduler, *url, clock}; SECTION("generates a heartbeat message") { client->clear(); @@ -355,100 +326,422 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry API") { REQUIRE(heartbeat["request_type"] == "app-heartbeat"); } - SECTION("captures metrics and sends generate-metrics payload") { - telemetry.metrics().tracer.trace_segments_created_new.inc(); - REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 1); - scheduler->trigger_metrics_capture(); - scheduler->trigger_heartbeat(); - - REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 0); + SECTION("metrics reporting") { + SECTION("counters are correctly serialized in generate-metrics payload") { + client->clear(); + /// test cases for counters: + /// - can't decrement below zero. -> is that a telemetry requirements? + /// - rates or counter reset to zero after capture. + const Counter my_counter{"my_counter", "counter-test", true}; + telemetry.increment_counter(my_counter); // = 1 + telemetry.increment_counter(my_counter); // = 2 + telemetry.increment_counter(my_counter); // = 3 + telemetry.decrement_counter(my_counter); // = 2 + scheduler->trigger_metrics_capture(); + + telemetry.increment_counter(my_counter); // = 1 + scheduler->trigger_metrics_capture(); + + telemetry.set_counter(my_counter, 42); + telemetry.set_counter(my_counter, {"event:test"}, 100); + telemetry.decrement_counter(my_counter, {"event:test"}); + scheduler->trigger_metrics_capture(); + + // Expect 2 series: + // - `my_counter` without tags: 3 datapoint (2, 1, 42) with the same + // timestamp. + // - `my_counter` with `event:test` tags: 1 datapoint (99). + scheduler->trigger_heartbeat(); - auto heartbeat_and_telemetry_message = client->request_body; - auto message_batch = nlohmann::json::parse(heartbeat_and_telemetry_message); - REQUIRE(is_valid_telemetry_payload(message_batch) == true); - REQUIRE(message_batch["payload"].size() == 2); - auto generate_metrics = message_batch["payload"][1]; - REQUIRE(generate_metrics["request_type"] == "generate-metrics"); - auto payload = generate_metrics["payload"]; - auto series = payload["series"]; - REQUIRE(series.size() == 1); - auto metric = series[0]; - REQUIRE(metric["metric"] == "trace_segments_created"); - auto tags = metric["tags"]; - REQUIRE(tags.size() == 1); - REQUIRE(tags[0] == "new_continued:new"); - auto points = metric["points"]; - REQUIRE(points.size() == 1); - REQUIRE(points[0][0] == mock_time); - REQUIRE(points[0][1] == 1); - } + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; - SECTION("logs serialization") { - SECTION("log level is correct") { - struct TestCase { - std::string_view name; - std::string input; - Optional stacktrace; - std::function& stacktrace)> - apply; - std::string expected_log_level; - }; + auto series = payload["series"]; + REQUIRE(series.size() == 2); - auto test_case = GENERATE(values({ + const auto expected_metrics = nlohmann::json::parse(R"( + [ { - "warning log", - "This is a warning log!", - nullopt, - [](Telemetry& telemetry, const std::string& input, - const Optional&) { - telemetry.log_warning(input); - }, - "WARNING", + "common": true, + "metric": "my_counter", + "namespace": "counter-test", + "points": [ + [ 1672484400, 99 ] + ], + "tags": [ "event:test" ], + "type": "count" }, { - "error log", - "This is an error log!", - nullopt, - [](Telemetry& telemetry, const std::string& input, - const Optional&) { telemetry.log_error(input); }, - "ERROR", - }, + "common": true, + "metric": "my_counter", + "namespace": "counter-test", + "points": [ + [ 1672484400, 2 ], + [ 1672484400, 1 ], + [ 1672484400, 42 ] + ], + "type": "count" + } + ] + )"); + + for (const auto& s : series) { + if (s.contains("tags")) { + CHECK(s == expected_metrics[0]); + } else { + CHECK(s == expected_metrics[1]); + } + } + + // Make sure the next heartbeat doesn't contains counters if no + // datapoint has been incremented, decremented or set. + client->clear(); + scheduler->trigger_heartbeat(); + + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); + + auto payload2 = message_batch["payload"][0]; + CHECK(payload2["request_type"] == "app-heartbeat"); + } + + SECTION("counters can't go below zero") { + client->clear(); + const Counter positive_counter{"positive_counter", "counter-test2", true}; + telemetry.decrement_counter(positive_counter); // = 0 + telemetry.decrement_counter(positive_counter); // = 0 + telemetry.decrement_counter(positive_counter); // = 0 + + scheduler->trigger_metrics_capture(); + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; + + auto series = payload["series"]; + REQUIRE(series.size() == 1); + + const auto expected_metrics = nlohmann::json::parse(R"( + [ { - "error log with stacktrace", - "This is an error log with a fake stacktrace!", - "error here\nthen here\nfinally here\n", - [](Telemetry& telemetry, const std::string& input, - Optional stacktrace) { - telemetry.log_error(input, *stacktrace); - }, - "ERROR", + "common": true, + "metric": "positive_counter", + "namespace": "counter-test2", + "points": [ + [ 1672484400, 0 ] + ], + "type": "count" + } + ] + )"); + CHECK(series == expected_metrics); + } + + SECTION("rate") { + client->clear(); + + Rate rps{"request", "rate-test", true}; + telemetry.set_rate(rps, 1000); + + scheduler->trigger_metrics_capture(); + + telemetry.set_rate(rps, 2000); + telemetry.set_rate(rps, 5000); + telemetry.set_rate(rps, {"status:2xx"}, 5000); + + scheduler->trigger_metrics_capture(); + + // Expect 2 series: + // - `request` without tags: 2 datapoint (1000, 5000) + // - `request` with tags: 1 datapoint (5000) + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 2); + auto generate_metrics = message_batch["payload"][1]; + REQUIRE(generate_metrics["request_type"] == "generate-metrics"); + auto payload = generate_metrics["payload"]; + + auto series = payload["series"]; + REQUIRE(series.size() == 2); + + const auto expected_metrics = nlohmann::json::parse(R"( + [ + { + "common": true, + "metric": "request", + "namespace": "rate-test", + "points": [ + [ 1672484400, 5000 ] + ], + "tags": [ "status:2xx" ], + "type": "rate" }, - })); + { + "common": true, + "metric": "request", + "namespace": "rate-test", + "points": [ + [ 1672484400, 1000 ], + [ 1672484400, 5000 ] + ], + "type": "rate" + } + ] + )"); + + for (const auto& s : series) { + if (s.contains("tags")) { + CHECK(s == expected_metrics[0]); + } else { + CHECK(s == expected_metrics[1]); + } + } + + // Make sure the next heartbeat doesn't contains distributions if no + // datapoint has been added to a distribution. + client->clear(); + scheduler->trigger_heartbeat(); - CAPTURE(test_case.name); + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); + auto payload2 = message_batch["payload"][0]; + CHECK(payload2["request_type"] == "app-heartbeat"); + } + + SECTION("distribution") { client->clear(); - test_case.apply(telemetry, test_case.input, test_case.stacktrace); + + Distribution response_time{"response_time", "dist-test", false}; + telemetry.add_datapoint(response_time, 128); + telemetry.add_datapoint(response_time, 42); + telemetry.add_datapoint(response_time, 3000); + + // Add a tag, this will add a new serie to the distribution payload. + telemetry.add_datapoint(response_time, {"status:200", "method:GET"}, + 6530); + + Distribution request_size{"request_size", "dist-test-2", true}; + telemetry.add_datapoint(request_size, 1843); + telemetry.add_datapoint(request_size, 4135); + + // Expect 3 series: + // - `response_time` without tags: 3 datapoint (128, 42, 3000). + // - `response_time` with 2 tags: 1 datapoint (6530). + // - `request_size`: 2 datapoint (1843, 4135). scheduler->trigger_heartbeat(); auto message_batch = nlohmann::json::parse(client->request_body); - REQUIRE(is_valid_telemetry_payload(message_batch)); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); REQUIRE(message_batch["payload"].size() == 2); - auto logs_message = message_batch["payload"][1]; - REQUIRE(logs_message["request_type"] == "logs"); + auto distribution_message = message_batch["payload"][1]; + REQUIRE(distribution_message["request_type"] == "distributions"); + + auto distribution_series = distribution_message["payload"]["series"]; + REQUIRE(distribution_series.size() == 3); + + const auto expected_series = nlohmann::json::parse(R"([ + { + "common":false, + "metric":"response_time", + "namespace":"dist-test", + "points": [6530], + "tags":["status:200","method:GET"] + }, + { + "common":true, + "metric": "request_size", + "namespace":"dist-test-2", + "points":[1843,4135] + }, + { + "common": false, + "metric":"response_time", + "namespace":"dist-test", + "points":[128,42,3000] + } + ])"); + + for (const auto& s : distribution_series) { + if (s["metric"] == "response_time") { + if (s.contains("tags")) { + CHECK(s == expected_series[0]); + } else { + CHECK(s == expected_series[2]); + } + } else if (s["metric"] == "request_size") { + CHECK(s == expected_series[1]); + } else { + FAIL(); + } + } + + // Make sure the next heartbeat doesn't contains distributions if no + // datapoint has been added to a distribution. + client->clear(); + scheduler->trigger_heartbeat(); - auto logs_payload = logs_message["payload"]["logs"]; - REQUIRE(logs_payload.size() == 1); - CHECK(logs_payload[0]["level"] == test_case.expected_log_level); - CHECK(logs_payload[0]["message"] == test_case.input); - CHECK(logs_payload[0].contains("tracer_time")); + auto message_batch2 = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch2) == true); + REQUIRE(message_batch2["payload"].size() == 1); + + auto payload = message_batch["payload"][0]; + CHECK(payload["request_type"] == "app-heartbeat"); + } - if (test_case.stacktrace) { - CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace); - } else { - CHECK(logs_payload[0].contains("stack_trace") == false); + SECTION("dtor sends metrics and distributions") { + // metrics captured before the aggregation task + const Distribution response_time{"response_time", "dist-test", false}; + const Rate rps{"request", "rate-test", true}; + const Counter my_counter{"my_counter", "counter-test", true}; + { + Telemetry tmp_telemetry{*finalize_config(), logger, client, + scheduler, *url, clock}; + tmp_telemetry.increment_counter(my_counter); // = 1 + tmp_telemetry.add_datapoint(response_time, 128); + tmp_telemetry.set_rate(rps, 1000); + client->clear(); + } + + // Expect 2 metrics with 1 datapoint each and 1 ditribution + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch) == true); + REQUIRE(message_batch["payload"].size() == 3); + + for (const auto& payload : message_batch["payload"]) { + const auto& req_type = payload["request_type"]; + if (req_type == "generate-metrics") { + const auto& metrics_series = payload["payload"]["series"]; + REQUIRE(metrics_series.size() == 2); + + for (const auto& s : metrics_series) { + if (s["metric"] == "my_counter") { + const auto expected_counter = nlohmann::json::parse(R"( + { + "common":true, + "metric":"my_counter", + "namespace":"counter-test", + "type": "count", + "points": [[1672484400, 1]] + } + )"); + CHECK(s == expected_counter); + } else if (s["metric"] == "request") { + const auto expected_rate = nlohmann::json::parse(R"( + { + "common":true, + "metric":"request", + "namespace":"rate-test", + "type": "rate", + "points": [[1672484400, 1000]] + } + )"); + CHECK(s == expected_rate); + } else { + FAIL("unexpected metrics name, got " << s["metric"]); + } + } + } else if (req_type == "distributions") { + const auto& distribution_series = payload["payload"]["series"]; + REQUIRE(distribution_series.size() == 1); + + const auto& d0 = distribution_series[0]; + const auto expected_d0 = nlohmann::json::parse(R"( + { + "common":false, + "metric":"response_time", + "namespace":"dist-test", + "points": [128] + } + )"); + CHECK(d0 == expected_d0); + } + } + } + + SECTION("logs serialization") { + SECTION("log level is correct") { + struct TestCase { + std::string_view name; + std::string input; + Optional stacktrace; + std::function& stacktrace)> + apply; + std::string expected_log_level; + }; + + auto test_case = GENERATE(values({ + { + "warning log", + "This is a warning log!", + nullopt, + [](Telemetry& telemetry, const std::string& input, + const Optional&) { + telemetry.log_warning(input); + }, + "WARNING", + }, + { + "error log", + "This is an error log!", + nullopt, + [](Telemetry& telemetry, const std::string& input, + const Optional&) { + telemetry.log_error(input); + }, + "ERROR", + }, + { + "error log with stacktrace", + "This is an error log with a fake stacktrace!", + "error here\nthen here\nfinally here\n", + [](Telemetry& telemetry, const std::string& input, + Optional stacktrace) { + telemetry.log_error(input, *stacktrace); + }, + "ERROR", + }, + })); + + CAPTURE(test_case.name); + + client->clear(); + test_case.apply(telemetry, test_case.input, test_case.stacktrace); + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch)); + REQUIRE(message_batch["payload"].size() == 2); + + auto logs_message = message_batch["payload"][1]; + REQUIRE(logs_message["request_type"] == "logs"); + + auto logs_payload = logs_message["payload"]["logs"]; + REQUIRE(logs_payload.size() == 1); + CHECK(logs_payload[0]["level"] == test_case.expected_log_level); + CHECK(logs_payload[0]["message"] == test_case.input); + CHECK(logs_payload[0].contains("tracer_time")); + + if (test_case.stacktrace) { + CHECK(logs_payload[0]["stack_trace"] == test_case.stacktrace); + } else { + CHECK(logs_payload[0].contains("stack_trace") == false); + } } } } @@ -464,7 +757,6 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto logger = std::make_shared(); auto client = std::make_shared(); auto scheduler = std::make_shared(); - std::vector> metrics; const TracerSignature tracer_signature{ /* runtime_id = */ RuntimeID::generate(), @@ -480,7 +772,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); CHECK(scheduler->metrics_callback == nullptr); CHECK(scheduler->metrics_interval == nullopt); } @@ -493,7 +785,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); CHECK(scheduler->metrics_callback != nullptr); CHECK(scheduler->metrics_interval == 500ms); @@ -510,7 +802,7 @@ TELEMETRY_IMPLEMENTATION_TEST("Tracer telemetry configuration") { auto final_cfg = finalize_config(cfg); REQUIRE(final_cfg); - Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + Telemetry telemetry(*final_cfg, logger, client, scheduler, *url); telemetry.log_error("error"); // NOTE(@dmehala): logs are sent with an heartbeat.