From 0c1a8c53b05564459d95e76a48b7f1915a404203 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Sat, 5 Apr 2025 13:21:43 +0200 Subject: [PATCH 1/4] [part 5] refactor!(telemetry): use configured intervals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, telemetry metrics were collected every 10 seconds and heartbeats were sent every 60 seconds, matching the default configuration. However, these intervals can be customized via environment variables or code. This commit updates the telemetry logic to use the configured intervals—whether set through environment variables or directly in the code—ensuring that metrics , heartbeat and logs messages are sent accordingly. --- src/datadog/telemetry/telemetry_impl.cpp | 24 +++-- test/telemetry/test_telemetry.cpp | 131 +++++++++++++++++++---- 2 files changed, 125 insertions(+), 30 deletions(-) diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index 7eee29e1..f6fc457d 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -94,18 +94,15 @@ Telemetry::Telemetry(FinalizedConfiguration config, } void Telemetry::schedule_tasks() { - // Only schedule this if telemetry is enabled. - // Every 10 seconds, have the tracer telemetry capture the metrics - // values. Every 60 seconds, also report those values to the datadog - // agent. tasks_.emplace_back(scheduler_->schedule_recurring_event( - std::chrono::seconds(10), [this, n = 0]() mutable { - n++; - tracer_telemetry_->capture_metrics(); - if (n % 6 == 0) { - send_heartbeat_and_telemetry(); - } - })); + config_.heartbeat_interval, + [this]() { send_heartbeat_and_telemetry(); })); + + if (config_.report_metrics) { + tasks_.emplace_back(scheduler_->schedule_recurring_event( + config_.metrics_interval, + [this]() mutable { tracer_telemetry_->capture_metrics(); })); + } } Telemetry::~Telemetry() { @@ -197,15 +194,20 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { return *this; } +// TODO(@dmehala): Move `report_logs` check in the serialization once +// `TracerTelemetry` will be removed. void Telemetry::log_error(std::string message) { + if (!config_.report_logs) return; tracer_telemetry_->log(std::move(message), LogLevel::ERROR); } void Telemetry::log_error(std::string message, std::string stacktrace) { + if (!config_.report_logs) return; tracer_telemetry_->log(std::move(message), LogLevel::ERROR, stacktrace); } void Telemetry::log_warning(std::string message) { + if (!config_.report_logs) return; tracer_telemetry_->log(std::move(message), LogLevel::WARNING); } diff --git a/test/telemetry/test_telemetry.cpp b/test/telemetry/test_telemetry.cpp index 8d81c13f..3d0d5369 100644 --- a/test/telemetry/test_telemetry.cpp +++ b/test/telemetry/test_telemetry.cpp @@ -10,13 +10,13 @@ #include "datadog/runtime_id.h" #include "datadog/telemetry/telemetry_impl.h" -#include "mocks/event_schedulers.h" #include "mocks/http_clients.h" #include "mocks/loggers.h" #include "test.h" using namespace datadog::tracing; using namespace datadog::telemetry; +using namespace std::chrono_literals; namespace { bool is_valid_telemetry_payload(const nlohmann::json& json) { @@ -31,6 +31,43 @@ bool is_valid_telemetry_payload(const nlohmann::json& json) { json.contains("/host"_json_pointer); } +struct FakeEventScehduler : public EventScheduler { + size_t count_tasks = 0; + std::function heartbeat_callback = nullptr; + std::function metrics_callback = nullptr; + Optional heartbeat_interval; + Optional metrics_interval; + bool cancelled = false; + + // NOTE: White box testing. This is a limitation of the event scheduler API. + Cancel schedule_recurring_event(std::chrono::steady_clock::duration interval, + std::function callback) override { + if (count_tasks == 0) { + heartbeat_callback = callback; + heartbeat_interval = interval; + } else if (count_tasks == 1) { + metrics_callback = callback; + metrics_interval = interval; + } + count_tasks++; + return [this]() { cancelled = true; }; + } + + void trigger_heartbeat() { + assert(heartbeat_callback != nullptr); + heartbeat_callback(); + } + + void trigger_metrics_capture() { + assert(metrics_callback != nullptr); + metrics_callback(); + } + + std::string config() const override { + return nlohmann::json::object({{"type", "FakeEventScheduler"}}).dump(); + } +}; + } // namespace TEST_CASE("Tracer telemetry", "[telemetry]") { @@ -43,27 +80,13 @@ TEST_CASE("Tracer telemetry", "[telemetry]") { auto logger = std::make_shared(); auto client = std::make_shared(); - auto scheduler = std::make_shared(); - - auto trigger_heartbeat = [&]() { - // White box testing. The current implementation send a heartbeat every 60s - // and the task is executed every 10s. - // TODO(@dmehala): should depends on the config - scheduler->event_callback(); - scheduler->event_callback(); - scheduler->event_callback(); - scheduler->event_callback(); - scheduler->event_callback(); - scheduler->event_callback(); - }; + auto scheduler = std::make_shared(); const TracerSignature tracer_signature{ /* runtime_id = */ RuntimeID::generate(), /* service = */ "testsvc", /* environment = */ "test"}; - const std::string ignore{""}; - auto url = HTTPClient::URL::parse("http://localhost:8000"); Telemetry telemetry{*finalize_config(), logger, @@ -227,7 +250,7 @@ TEST_CASE("Tracer telemetry", "[telemetry]") { SECTION("generates a heartbeat message") { client->clear(); - trigger_heartbeat(); + scheduler->trigger_heartbeat(); auto heartbeat_message = client->request_body; auto message_batch = nlohmann::json::parse(heartbeat_message); @@ -240,7 +263,8 @@ TEST_CASE("Tracer telemetry", "[telemetry]") { SECTION("captures metrics and sends generate-metrics payload") { telemetry.metrics().tracer.trace_segments_created_new.inc(); REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 1); - trigger_heartbeat(); + scheduler->trigger_metrics_capture(); + scheduler->trigger_heartbeat(); REQUIRE(telemetry.metrics().tracer.trace_segments_created_new.value() == 0); @@ -322,7 +346,7 @@ TEST_CASE("Tracer telemetry", "[telemetry]") { client->clear(); test_case.apply(telemetry, test_case.input, test_case.stacktrace); - trigger_heartbeat(); + scheduler->trigger_heartbeat(); auto message_batch = nlohmann::json::parse(client->request_body); REQUIRE(is_valid_telemetry_payload(message_batch)); @@ -345,3 +369,72 @@ TEST_CASE("Tracer telemetry", "[telemetry]") { } } } + +TEST_CASE("Tracer telemetry configuration", "[telemetry]") { + // Cases: + // - when `report_metrics` is set to false. No metrics are reported. + // - when `report_logs` is set to false. No logs are reported. + // - respects interval defined. + // - telemetry disabled doesn't send anything. + + auto logger = std::make_shared(); + auto client = std::make_shared(); + auto scheduler = std::make_shared(); + std::vector> metrics; + + const TracerSignature tracer_signature{ + /* runtime_id = */ RuntimeID::generate(), + /* service = */ "testsvc", + /* environment = */ "test"}; + + auto url = HTTPClient::URL::parse("http://localhost:8000"); + + SECTION("disabling metrics reporting do not collect metrics") { + Configuration cfg; + cfg.report_metrics = false; + + auto final_cfg = finalize_config(cfg); + REQUIRE(final_cfg); + + Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + CHECK(scheduler->metrics_callback == nullptr); + CHECK(scheduler->metrics_interval == nullopt); + } + + SECTION("intervals are respected") { + Configuration cfg; + cfg.metrics_interval_seconds = .5; + cfg.heartbeat_interval_seconds = 30; + + auto final_cfg = finalize_config(cfg); + REQUIRE(final_cfg); + + Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + CHECK(scheduler->metrics_callback != nullptr); + CHECK(scheduler->metrics_interval == 500ms); + + CHECK(scheduler->heartbeat_callback != nullptr); + CHECK(scheduler->metrics_interval != 30s); + } + + SECTION("disabling logs reporting do not collect logs") { + client->clear(); + + Configuration cfg; + cfg.report_logs = false; + + auto final_cfg = finalize_config(cfg); + REQUIRE(final_cfg); + + Telemetry telemetry(*final_cfg, logger, client, metrics, scheduler, *url); + telemetry.log_error("error"); + + // NOTE(@dmehala): logs are sent with an heartbeat. + scheduler->trigger_heartbeat(); + + auto message_batch = nlohmann::json::parse(client->request_body); + REQUIRE(is_valid_telemetry_payload(message_batch)); + REQUIRE(message_batch["payload"].size() == 1); + CHECK(message_batch["payload"][0]["request_type"] == "app-heartbeat"); + } +} From fa58332546028d9df539e906257508e3dbabc836 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Sat, 5 Apr 2025 14:16:48 +0200 Subject: [PATCH 2/4] [part 6] refactor!(telemetry): remove dependency on `TracerTelemetry` Moves the TracerTelemetry logic directly into Telemetry, eliminating the need for a separate `TracerTelemetry` object. --- BUILD.bazel | 2 - CMakeLists.txt | 1 - src/datadog/telemetry/telemetry_impl.cpp | 578 +++++++++++++++++++---- src/datadog/telemetry/telemetry_impl.h | 52 +- src/datadog/tracer_telemetry.cpp | 411 ---------------- src/datadog/tracer_telemetry.h | 125 ----- 6 files changed, 548 insertions(+), 621 deletions(-) delete mode 100644 src/datadog/tracer_telemetry.cpp delete mode 100644 src/datadog/tracer_telemetry.h diff --git a/BUILD.bazel b/BUILD.bazel index af291d9a..e3ea62d7 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -43,7 +43,6 @@ cc_library( "src/datadog/tags.cpp", "src/datadog/threaded_event_scheduler.cpp", "src/datadog/tracer_config.cpp", - "src/datadog/tracer_telemetry.cpp", "src/datadog/tracer.cpp", "src/datadog/trace_id.cpp", "src/datadog/trace_sampler_config.cpp", @@ -76,7 +75,6 @@ cc_library( "src/datadog/tag_propagation.h", "src/datadog/tags.h", "src/datadog/threaded_event_scheduler.h", - "src/datadog/tracer_telemetry.h", "src/datadog/trace_sampler.h", "src/datadog/w3c_propagation.h", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index a8a2cae7..e807b0a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,7 +145,6 @@ target_sources(dd_trace_cpp-objects src/datadog/tag_propagation.cpp src/datadog/threaded_event_scheduler.cpp src/datadog/tracer_config.cpp - src/datadog/tracer_telemetry.cpp src/datadog/tracer.cpp src/datadog/trace_id.cpp src/datadog/trace_sampler_config.cpp diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index f6fc457d..bb14eb4f 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -12,7 +12,6 @@ #include "datadog_agent.h" #include "platform_util.h" -#include "tracer_telemetry.h" using namespace datadog::tracing; using namespace std::chrono_literals; @@ -32,12 +31,69 @@ void cancel_tasks(std::vector& tasks) { tasks.clear(); } +std::string to_string(datadog::tracing::ConfigName name) { + switch (name) { + case ConfigName::SERVICE_NAME: + return "service"; + case ConfigName::SERVICE_ENV: + return "env"; + case ConfigName::SERVICE_VERSION: + return "application_version"; + case ConfigName::REPORT_TRACES: + return "trace_enabled"; + case ConfigName::TAGS: + return "trace_tags"; + case ConfigName::EXTRACTION_STYLES: + return "trace_propagation_style_extract"; + case ConfigName::INJECTION_STYLES: + return "trace_propagation_style_inject"; + case ConfigName::STARTUP_LOGS: + return "trace_startup_logs_enabled"; + case ConfigName::REPORT_TELEMETRY: + return "instrumentation_telemetry_enabled"; + case ConfigName::DELEGATE_SAMPLING: + return "DD_TRACE_DELEGATE_SAMPLING"; + case ConfigName::GENEREATE_128BIT_TRACE_IDS: + return "trace_128_bits_id_enabled"; + case ConfigName::AGENT_URL: + return "trace_agent_url"; + case ConfigName::RC_POLL_INTERVAL: + return "remote_config_poll_interval"; + case ConfigName::TRACE_SAMPLING_RATE: + return "trace_sample_rate"; + case ConfigName::TRACE_SAMPLING_LIMIT: + return "trace_rate_limit"; + case ConfigName::SPAN_SAMPLING_RULES: + return "span_sample_rules"; + case ConfigName::TRACE_SAMPLING_RULES: + return "trace_sample_rules"; + case ConfigName::TRACE_BAGGAGE_MAX_BYTES: + return "trace_baggage_max_bytes"; + case ConfigName::TRACE_BAGGAGE_MAX_ITEMS: + return "trace_baggage_max_items"; + } + + std::abort(); +} + +nlohmann::json encode_log(const telemetry::LogMessage& log) { + auto encoded = nlohmann::json{ + {"message", log.message}, + {"level", to_string(log.level)}, + {"tracer_time", log.timestamp}, + }; + if (log.stacktrace) { + encoded.emplace("stack_trace", *log.stacktrace); + } + return encoded; +} + } // namespace Telemetry::Telemetry(FinalizedConfiguration config, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, + std::vector> user_metrics_, std::shared_ptr event_scheduler, HTTPClient::URL agent_url, Clock clock) : config_(std::move(config)), @@ -48,26 +104,41 @@ Telemetry::Telemetry(FinalizedConfiguration config, http_client_(client), clock_(std::move(clock)), scheduler_(event_scheduler) { - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - metrics); + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } // Callback for successful telemetry HTTP requests, to examine HTTP // status. @@ -100,15 +171,14 @@ void Telemetry::schedule_tasks() { if (config_.report_metrics) { tasks_.emplace_back(scheduler_->schedule_recurring_event( - config_.metrics_interval, - [this]() mutable { tracer_telemetry_->capture_metrics(); })); + config_.metrics_interval, [this]() mutable { capture_metrics(); })); } } Telemetry::~Telemetry() { if (!tasks_.empty()) { cancel_tasks(tasks_); - tracer_telemetry_->capture_metrics(); + capture_metrics(); // The app-closing message is bundled with a message containing the // final metric values. send_app_closing(); @@ -120,36 +190,54 @@ Telemetry::Telemetry(Telemetry&& rhs) : metrics_(std::move(rhs.metrics_)), config_(std::move(rhs.config_)), logger_(std::move(rhs.logger_)), - // tracer_telemetry_(std::move(rhs.tracer_telemetry_)), telemetry_on_response_(std::move(rhs.telemetry_on_response_)), telemetry_on_error_(std::move(rhs.telemetry_on_error_)), telemetry_endpoint_(std::move(rhs.telemetry_endpoint_)), tracer_signature_(std::move(rhs.tracer_signature_)), http_client_(rhs.http_client_), clock_(std::move(rhs.clock_)), - scheduler_(std::move(rhs.scheduler_)) { - cancel_tasks(rhs.tasks_); + scheduler_(std::move(rhs.scheduler_)), + user_metrics_(std::move(rhs.user_metrics_)), + seq_id_(rhs.seq_id_), + config_seq_ids_(rhs.config_seq_ids_), + host_info_(rhs.host_info_) { + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - std::vector>{}); + cancel_tasks(rhs.tasks_); schedule_tasks(); } @@ -158,7 +246,6 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(metrics_, rhs.metrics_); std::swap(config_, rhs.config_); std::swap(logger_, rhs.logger_); - std::swap(tracer_telemetry_, rhs.tracer_telemetry_); std::swap(telemetry_on_response_, rhs.telemetry_on_response_); std::swap(telemetry_on_error_, rhs.telemetry_on_error_); std::swap(telemetry_endpoint_, rhs.telemetry_endpoint_); @@ -166,29 +253,48 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(tracer_signature_, rhs.tracer_signature_); std::swap(http_client_, rhs.http_client_); std::swap(clock_, rhs.clock_); + std::swap(user_metrics_, rhs.user_metrics_); + std::swap(seq_id_, rhs.seq_id_); + std::swap(config_seq_ids_, rhs.config_seq_ids_); + std::swap(host_info_, rhs.host_info_); + + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } cancel_tasks(rhs.tasks_); - - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - std::vector>{}); schedule_tasks(); } return *this; @@ -198,22 +304,22 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { // `TracerTelemetry` will be removed. void Telemetry::log_error(std::string message) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::ERROR); + log(std::move(message), LogLevel::ERROR); } void Telemetry::log_error(std::string message, std::string stacktrace) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::ERROR, stacktrace); + log(std::move(message), LogLevel::ERROR, stacktrace); } void Telemetry::log_warning(std::string message) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::WARNING); + log(std::move(message), LogLevel::WARNING); } void Telemetry::send_telemetry(StringView request_type, std::string payload) { auto set_telemetry_headers = [request_type, payload_size = payload.size(), - debug_enabled = tracer_telemetry_->debug(), + debug_enabled = config_.debug, tracer_signature = &tracer_signature_](DictWriter& headers) { /* @@ -246,27 +352,339 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) { void Telemetry::send_app_started( const std::unordered_map& config_metadata) { - send_telemetry("app-started", - tracer_telemetry_->app_started(config_metadata)); + send_telemetry("app-started", app_started(config_metadata)); } void Telemetry::send_app_closing() { - send_telemetry("app-closing", tracer_telemetry_->app_closing()); + send_telemetry("app-closing", app_closing()); } void Telemetry::send_heartbeat_and_telemetry() { - send_telemetry("app-heartbeat", tracer_telemetry_->heartbeat_and_telemetry()); + send_telemetry("app-heartbeat", heartbeat_and_telemetry()); } void Telemetry::send_configuration_change() { - if (auto payload = tracer_telemetry_->configuration_change()) { + if (auto payload = configuration_change()) { send_telemetry("app-client-configuration-change", *payload); } } +std::string Telemetry::heartbeat_and_telemetry() { + auto batch_payloads = nlohmann::json::array(); + + auto heartbeat = nlohmann::json::object({ + {"request_type", "app-heartbeat"}, + }); + batch_payloads.emplace_back(std::move(heartbeat)); + + auto metrics = nlohmann::json::array(); + for (auto& m : metrics_snapshots_) { + auto& metric = m.first.get(); + auto& points = m.second; + if (!points.empty()) { + auto type = metric.type(); + if (type == "count") { + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"points", points}, + {"namespace", metric.scope()}, + {"common", metric.common()}, + })); + } else if (type == "gauge") { + // gauge metrics have a interval + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"namespace", metric.scope()}, + {"interval", 10}, + {"points", points}, + {"common", metric.common()}, + })); + } + } + points.clear(); + } + + if (!metrics.empty()) { + auto generate_metrics = nlohmann::json::object({ + {"request_type", "generate-metrics"}, + {"payload", nlohmann::json::object({ + {"series", metrics}, + })}, + }); + batch_payloads.emplace_back(std::move(generate_metrics)); + } + + if (!logs_.empty()) { + auto encoded_logs = nlohmann::json::array(); + for (const auto& log : logs_) { + auto encoded = encode_log(log); + encoded_logs.emplace_back(std::move(encoded)); + } + + assert(!encoded_logs.empty()); + + auto logs_payload = nlohmann::json::object({ + {"request_type", "logs"}, + {"payload", + nlohmann::json{ + {"logs", encoded_logs}, + }}, + }); + + batch_payloads.emplace_back(std::move(logs_payload)); + } + + auto telemetry_body = generate_telemetry_body("message-batch"); + telemetry_body["payload"] = batch_payloads; + auto message_batch_payload = telemetry_body.dump(); + + return message_batch_payload; +} + +std::string Telemetry::app_closing() { + auto batch_payloads = nlohmann::json::array(); + + auto app_closing = nlohmann::json::object({ + {"request_type", "app-closing"}, + }); + batch_payloads.emplace_back(std::move(app_closing)); + + auto metrics = nlohmann::json::array(); + for (auto& m : metrics_snapshots_) { + auto& metric = m.first.get(); + auto& points = m.second; + if (!points.empty()) { + auto type = metric.type(); + if (type == "count") { + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"points", points}, + {"common", metric.common()}, + {"namespace", metric.scope()}, + })); + } else if (type == "gauge") { + // gauge metrics have a interval + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"interval", 10}, + {"points", points}, + {"common", metric.common()}, + {"namespace", metric.scope()}, + })); + } + } + points.clear(); + } + + if (!metrics.empty()) { + auto generate_metrics = nlohmann::json::object({ + {"request_type", "generate-metrics"}, + {"payload", nlohmann::json::object({ + {"series", metrics}, + })}, + }); + batch_payloads.emplace_back(std::move(generate_metrics)); + } + + if (!logs_.empty()) { + auto encoded_logs = nlohmann::json::array(); + for (const auto& log : logs_) { + auto encoded = encode_log(log); + encoded_logs.emplace_back(std::move(encoded)); + } + + assert(!encoded_logs.empty()); + + auto logs_payload = nlohmann::json::object({ + {"request_type", "logs"}, + {"payload", + nlohmann::json{ + {"logs", encoded_logs}, + }}, + }); + + batch_payloads.emplace_back(std::move(logs_payload)); + } + + auto telemetry_body = generate_telemetry_body("message-batch"); + telemetry_body["payload"] = batch_payloads; + auto message_batch_payload = telemetry_body.dump(); + + return message_batch_payload; +} + +std::string Telemetry::app_started( + const std::unordered_map& configurations) { + auto configuration_json = nlohmann::json::array(); + for (const auto& [_, config_metadata] : configurations) { + // if (config_metadata.value.empty()) continue; + + configuration_json.emplace_back( + generate_configuration_field(config_metadata)); + } + + // clang-format off + auto app_started_msg = nlohmann::json{ + {"request_type", "app-started"}, + {"payload", nlohmann::json{ + {"configuration", configuration_json} + }} + }; + + auto batch = generate_telemetry_body("message-batch"); + batch["payload"] = nlohmann::json::array({ + std::move(app_started_msg) + }); + // clang-format on + + if (!config_.integration_name.empty()) { + // clang-format off + auto integration_msg = nlohmann::json{ + {"request_type", "app-integrations-change"}, + {"payload", nlohmann::json{ + {"integrations", nlohmann::json::array({ + nlohmann::json{ + {"name", config_.integration_name}, + {"version", config_.integration_version}, + {"enabled", true} + } + })} + }} + }; + // clang-format on + + batch["payload"].emplace_back(std::move(integration_msg)); + } + + return batch.dump(); +} + +Optional Telemetry::configuration_change() { + if (configuration_snapshot_.empty()) return nullopt; + + std::vector current_configuration; + std::swap(current_configuration, configuration_snapshot_); + + auto configuration_json = nlohmann::json::array(); + for (const auto& config_metadata : current_configuration) { + configuration_json.emplace_back( + generate_configuration_field(config_metadata)); + } + + auto telemetry_body = + generate_telemetry_body("app-client-configuration-change"); + telemetry_body["payload"] = + nlohmann::json{{"configuration", configuration_json}}; + + return telemetry_body.dump(); +} + +nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { + std::time_t tracer_time = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + seq_id_++; + return nlohmann::json::object({ + {"api_version", "v2"}, + {"seq_id", seq_id_}, + {"request_type", request_type}, + {"tracer_time", tracer_time}, + {"runtime_id", tracer_signature_.runtime_id.string()}, + {"debug", config_.debug}, + {"application", + nlohmann::json::object({ + {"service_name", tracer_signature_.default_service}, + {"env", tracer_signature_.default_environment}, + {"tracer_version", tracer_signature_.library_version}, + {"language_name", tracer_signature_.library_language}, + {"language_version", tracer_signature_.library_language_version}, + })}, + {"host", + { + {"hostname", host_info_.hostname}, + {"os", host_info_.os}, + {"os_version", host_info_.os_version}, + {"architecture", host_info_.cpu_architecture}, + {"kernel_name", host_info_.kernel_name}, + {"kernel_version", host_info_.kernel_version}, + {"kernel_release", host_info_.kernel_release}, + }}, + }); +} + +nlohmann::json Telemetry::generate_configuration_field( + const ConfigMetadata& config_metadata) { + // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can + // detect between non set fields. + config_seq_ids_[config_metadata.name] += 1; + auto seq_id = config_seq_ids_[config_metadata.name]; + + auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, + {"value", config_metadata.value}, + {"seq_id", seq_id}}; + + switch (config_metadata.origin) { + case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: + j["origin"] = "env_var"; + break; + case ConfigMetadata::Origin::CODE: + j["origin"] = "code"; + break; + case ConfigMetadata::Origin::REMOTE_CONFIG: + j["origin"] = "remote_config"; + break; + case ConfigMetadata::Origin::DEFAULT: + j["origin"] = "default"; + break; + } + + if (config_metadata.error) { + // clang-format off + j["error"] = { + {"code", config_metadata.error->code}, + {"message", config_metadata.error->message} + }; + // clang-format on + } + + return j; +} + void Telemetry::capture_configuration_change( const std::vector& new_configuration) { - return tracer_telemetry_->capture_configuration_change(new_configuration); + configuration_snapshot_.insert(configuration_snapshot_.begin(), + new_configuration.begin(), + new_configuration.end()); +} + +void Telemetry::capture_metrics() { + std::time_t timepoint = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + for (auto& m : metrics_snapshots_) { + auto value = m.first.get().capture_and_reset_value(); + if (value == 0) { + continue; + } + m.second.emplace_back(timepoint, value); + } +} + +void Telemetry::log(std::string message, telemetry::LogLevel level, + Optional stacktrace) { + auto timestamp = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + logs_.emplace_back( + telemetry::LogMessage{std::move(message), level, stacktrace, timestamp}); } } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 4c2f24ae..7240f806 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -1,11 +1,17 @@ #pragma once +#include +#include #include #include #include #include +#include +#include -#include "tracer_telemetry.h" +#include "json.hpp" +#include "log.h" +#include "platform_util.h" namespace datadog::telemetry { @@ -17,7 +23,6 @@ class Telemetry final { FinalizedConfiguration config_; /// Shared pointer to the user logger instance. std::shared_ptr logger_; - std::shared_ptr tracer_telemetry_; std::vector tasks_; tracing::HTTPClient::ResponseHandler telemetry_on_response_; tracing::HTTPClient::ErrorHandler telemetry_on_error_; @@ -27,6 +32,26 @@ class Telemetry final { tracing::Clock clock_; std::shared_ptr scheduler_; + // This uses a reference_wrapper so references to internal metric values can + // be captured, and be iterated trivially when the values need to be + // snapshotted and published in telemetry messages. + using MetricSnapshot = std::vector>; + std::vector< + std::pair, MetricSnapshot>> + metrics_snapshots_; + std::vector> user_metrics_; + + std::vector configuration_snapshot_; + + std::vector logs_; + + // Track sequence id per payload generated + uint64_t seq_id_ = 0; + // Track sequence id per configuration field + std::unordered_map config_seq_ids_; + + tracing::HostInfo host_info_; + public: /// Constructor for the Telemetry class /// @@ -81,6 +106,29 @@ class Telemetry final { void send_heartbeat_and_telemetry(); void schedule_tasks(); + + void capture_metrics(); + + void log(std::string message, telemetry::LogLevel level, + tracing::Optional stacktrace = tracing::nullopt); + + tracing::Optional configuration_change(); + + nlohmann::json generate_telemetry_body(std::string request_type); + nlohmann::json generate_configuration_field( + const tracing::ConfigMetadata& config_metadata); + + // Constructs an `app-started` message using information provided when + // constructed and the tracer_config value passed in. + std::string app_started( + const std::unordered_map& + configurations); + // Constructs a messsage-batch containing `app-heartbeat`, and if metrics + // have been modified, a `generate-metrics` message. + std::string heartbeat_and_telemetry(); + // Constructs a message-batch containing `app-closing`, and if metrics have + // been modified, a `generate-metrics` message. + std::string app_closing(); }; } // namespace datadog::telemetry diff --git a/src/datadog/tracer_telemetry.cpp b/src/datadog/tracer_telemetry.cpp deleted file mode 100644 index 487fd2f6..00000000 --- a/src/datadog/tracer_telemetry.cpp +++ /dev/null @@ -1,411 +0,0 @@ -#include "tracer_telemetry.h" - -#include -#include -#include - -#include "platform_util.h" - -namespace datadog { -namespace tracing { -namespace { - -std::string to_string(datadog::tracing::ConfigName name) { - switch (name) { - case ConfigName::SERVICE_NAME: - return "service"; - case ConfigName::SERVICE_ENV: - return "env"; - case ConfigName::SERVICE_VERSION: - return "application_version"; - case ConfigName::REPORT_TRACES: - return "trace_enabled"; - case ConfigName::TAGS: - return "trace_tags"; - case ConfigName::EXTRACTION_STYLES: - return "trace_propagation_style_extract"; - case ConfigName::INJECTION_STYLES: - return "trace_propagation_style_inject"; - case ConfigName::STARTUP_LOGS: - return "trace_startup_logs_enabled"; - case ConfigName::REPORT_TELEMETRY: - return "instrumentation_telemetry_enabled"; - case ConfigName::DELEGATE_SAMPLING: - return "DD_TRACE_DELEGATE_SAMPLING"; - case ConfigName::GENEREATE_128BIT_TRACE_IDS: - return "trace_128_bits_id_enabled"; - case ConfigName::AGENT_URL: - return "trace_agent_url"; - case ConfigName::RC_POLL_INTERVAL: - return "remote_config_poll_interval"; - case ConfigName::TRACE_SAMPLING_RATE: - return "trace_sample_rate"; - case ConfigName::TRACE_SAMPLING_LIMIT: - return "trace_rate_limit"; - case ConfigName::SPAN_SAMPLING_RULES: - return "span_sample_rules"; - case ConfigName::TRACE_SAMPLING_RULES: - return "trace_sample_rules"; - case ConfigName::TRACE_BAGGAGE_MAX_BYTES: - return "trace_baggage_max_bytes"; - case ConfigName::TRACE_BAGGAGE_MAX_ITEMS: - return "trace_baggage_max_items"; - } - - std::abort(); -} - -nlohmann::json encode_log(const telemetry::LogMessage& log) { - auto encoded = nlohmann::json{ - {"message", log.message}, - {"level", to_string(log.level)}, - {"tracer_time", log.timestamp}, - }; - if (log.stacktrace) { - encoded.emplace("stack_trace", *log.stacktrace); - } - return encoded; -} - -} // namespace - -TracerTelemetry::TracerTelemetry( - bool enabled, const Clock& clock, const std::shared_ptr& logger, - const TracerSignature& tracer_signature, - const std::string& integration_name, const std::string& integration_version, - const std::vector>& - internal_metrics, - const std::vector>& user_metrics) - : enabled_(enabled), - clock_(clock), - logger_(logger), - host_info_(get_host_info()), - tracer_signature_(tracer_signature), - integration_name_(integration_name), - integration_version_(integration_version), - user_metrics_(user_metrics) { - if (enabled_) { - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - for (auto& m : internal_metrics) { - metrics_snapshots_.emplace_back(m, MetricSnapshot{}); - } - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - } -} - -nlohmann::json TracerTelemetry::generate_telemetry_body( - std::string request_type) { - std::time_t tracer_time = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - seq_id_++; - return nlohmann::json::object({ - {"api_version", "v2"}, - {"seq_id", seq_id_}, - {"request_type", request_type}, - {"tracer_time", tracer_time}, - {"runtime_id", tracer_signature_.runtime_id.string()}, - {"debug", debug_}, - {"application", - nlohmann::json::object({ - {"service_name", tracer_signature_.default_service}, - {"env", tracer_signature_.default_environment}, - {"tracer_version", tracer_signature_.library_version}, - {"language_name", tracer_signature_.library_language}, - {"language_version", tracer_signature_.library_language_version}, - })}, - {"host", - { - {"hostname", host_info_.hostname}, - {"os", host_info_.os}, - {"os_version", host_info_.os_version}, - {"architecture", host_info_.cpu_architecture}, - {"kernel_name", host_info_.kernel_name}, - {"kernel_version", host_info_.kernel_version}, - {"kernel_release", host_info_.kernel_release}, - }}, - }); -} - -nlohmann::json TracerTelemetry::generate_configuration_field( - const ConfigMetadata& config_metadata) { - // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can - // detect between non set fields. - config_seq_ids[config_metadata.name] += 1; - auto seq_id = config_seq_ids[config_metadata.name]; - - auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, - {"value", config_metadata.value}, - {"seq_id", seq_id}}; - - switch (config_metadata.origin) { - case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: - j["origin"] = "env_var"; - break; - case ConfigMetadata::Origin::CODE: - j["origin"] = "code"; - break; - case ConfigMetadata::Origin::REMOTE_CONFIG: - j["origin"] = "remote_config"; - break; - case ConfigMetadata::Origin::DEFAULT: - j["origin"] = "default"; - break; - } - - if (config_metadata.error) { - // clang-format off - j["error"] = { - {"code", config_metadata.error->code}, - {"message", config_metadata.error->message} - }; - // clang-format on - } - - return j; -} - -std::string TracerTelemetry::app_started( - const std::unordered_map& configurations) { - auto configuration_json = nlohmann::json::array(); - for (const auto& [_, config_metadata] : configurations) { - // if (config_metadata.value.empty()) continue; - - configuration_json.emplace_back( - generate_configuration_field(config_metadata)); - } - - // clang-format off - auto app_started_msg = nlohmann::json{ - {"request_type", "app-started"}, - {"payload", nlohmann::json{ - {"configuration", configuration_json} - }} - }; - - auto batch = generate_telemetry_body("message-batch"); - batch["payload"] = nlohmann::json::array({ - std::move(app_started_msg) - }); - // clang-format on - - if (!integration_name_.empty()) { - // clang-format off - auto integration_msg = nlohmann::json{ - {"request_type", "app-integrations-change"}, - {"payload", nlohmann::json{ - {"integrations", nlohmann::json::array({ - nlohmann::json{ - {"name", integration_name_}, - {"version", integration_version_}, - {"enabled", true} - } - })} - }} - }; - // clang-format on - - batch["payload"].emplace_back(std::move(integration_msg)); - } - - return batch.dump(); -} - -void TracerTelemetry::capture_metrics() { - std::time_t timepoint = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - for (auto& m : metrics_snapshots_) { - auto value = m.first.get().capture_and_reset_value(); - if (value == 0) { - continue; - } - m.second.emplace_back(timepoint, value); - } -} - -void TracerTelemetry::capture_configuration_change( - const std::vector& new_configuration) { - configuration_snapshot_.insert(configuration_snapshot_.begin(), - new_configuration.begin(), - new_configuration.end()); -} - -std::string TracerTelemetry::heartbeat_and_telemetry() { - auto batch_payloads = nlohmann::json::array(); - - auto heartbeat = nlohmann::json::object({ - {"request_type", "app-heartbeat"}, - }); - batch_payloads.emplace_back(std::move(heartbeat)); - - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"namespace", metric.scope()}, - {"common", metric.common()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"namespace", metric.scope()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - })); - } - } - points.clear(); - } - - if (!metrics.empty()) { - auto generate_metrics = nlohmann::json::object({ - {"request_type", "generate-metrics"}, - {"payload", nlohmann::json::object({ - {"series", metrics}, - })}, - }); - batch_payloads.emplace_back(std::move(generate_metrics)); - } - - if (!logs_.empty()) { - auto encoded_logs = nlohmann::json::array(); - for (const auto& log : logs_) { - auto encoded = encode_log(log); - encoded_logs.emplace_back(std::move(encoded)); - } - - assert(!encoded_logs.empty()); - - auto logs_payload = nlohmann::json::object({ - {"request_type", "logs"}, - {"payload", - nlohmann::json{ - {"logs", encoded_logs}, - }}, - }); - - batch_payloads.emplace_back(std::move(logs_payload)); - } - - auto telemetry_body = generate_telemetry_body("message-batch"); - telemetry_body["payload"] = batch_payloads; - auto message_batch_payload = telemetry_body.dump(); - - return message_batch_payload; -} - -std::string TracerTelemetry::app_closing() { - auto batch_payloads = nlohmann::json::array(); - - auto app_closing = nlohmann::json::object({ - {"request_type", "app-closing"}, - }); - batch_payloads.emplace_back(std::move(app_closing)); - - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } - } - points.clear(); - } - - if (!metrics.empty()) { - auto generate_metrics = nlohmann::json::object({ - {"request_type", "generate-metrics"}, - {"payload", nlohmann::json::object({ - {"series", metrics}, - })}, - }); - batch_payloads.emplace_back(std::move(generate_metrics)); - } - - if (!logs_.empty()) { - auto encoded_logs = nlohmann::json::array(); - for (const auto& log : logs_) { - auto encoded = encode_log(log); - encoded_logs.emplace_back(std::move(encoded)); - } - - assert(!encoded_logs.empty()); - - auto logs_payload = nlohmann::json::object({ - {"request_type", "logs"}, - {"payload", - nlohmann::json{ - {"logs", encoded_logs}, - }}, - }); - - batch_payloads.emplace_back(std::move(logs_payload)); - } - - auto telemetry_body = generate_telemetry_body("message-batch"); - telemetry_body["payload"] = batch_payloads; - auto message_batch_payload = telemetry_body.dump(); - - return message_batch_payload; -} - -Optional TracerTelemetry::configuration_change() { - if (configuration_snapshot_.empty()) return nullopt; - - std::vector current_configuration; - std::swap(current_configuration, configuration_snapshot_); - - auto configuration_json = nlohmann::json::array(); - for (const auto& config_metadata : current_configuration) { - configuration_json.emplace_back( - generate_configuration_field(config_metadata)); - } - - auto telemetry_body = - generate_telemetry_body("app-client-configuration-change"); - telemetry_body["payload"] = - nlohmann::json{{"configuration", configuration_json}}; - - return telemetry_body.dump(); -} - -} // namespace tracing -} // namespace datadog diff --git a/src/datadog/tracer_telemetry.h b/src/datadog/tracer_telemetry.h deleted file mode 100644 index e1ebc94e..00000000 --- a/src/datadog/tracer_telemetry.h +++ /dev/null @@ -1,125 +0,0 @@ -#pragma once - -// This component provides a class, TracerTelemetry, that is used to collect -// data from the activity of the tracer implementation, and encode messages that -// can be submitted to the Datadog Agent. -// -// Counter metrics are updated in other parts of the tracers, with the values -// being managed by this class. -// -// The messages that TracerTelemetry produces are -// - `app-started` -// - `message-batch` -// - `app-heartbeat` -// - `generate-metrics` -// - `app-closing` -// - `app-client-configuration-change` -// -// `app-started` messages are sent as part of initializing the tracer. -// -// At 60 second intervals, a `message-batch` message is sent containing an -// `app-heartbeat` message, and if metrics have changed during that interval, a -// `generate-metrics` message is also included in the batch. -// -// `app-closing` messages are sent as part of terminating the tracer. These are -// sent as a `message-batch` message , and if metrics have changed since the -// last `app-heartbeat` event, a `generate-metrics` message is also included in -// the batch. -// -// `app-client-configuration-change` messages are sent as soon as the tracer -// configuration has been updated by a Remote Configuration event. -#include -#include -#include -#include -#include - -#include - -#include "json.hpp" -#include "platform_util.h" -#include "telemetry/log.h" - -namespace datadog { -namespace tracing { - -class Logger; -struct SpanDefaults; - -class TracerTelemetry { - bool enabled_ = false; - bool debug_ = true; - Clock clock_; - std::shared_ptr logger_; - HostInfo host_info_; - TracerSignature tracer_signature_; - std::string integration_name_; - std::string integration_version_; - // Track sequence id per payload generated - uint64_t seq_id_ = 0; - // Track sequence id per configuration field - std::unordered_map config_seq_ids; - // Each metric has an associated MetricSnapshot that contains the data points, - // represented as a timestamp and the value of that metric. - using MetricSnapshot = std::vector>; - // This uses a reference_wrapper so references to internal metric values can - // be captured, and be iterated trivially when the values need to be - // snapshotted and published in telemetry messages. - std::vector< - std::pair, MetricSnapshot>> - metrics_snapshots_; - - std::vector configuration_snapshot_; - - nlohmann::json generate_telemetry_body(std::string request_type); - - nlohmann::json generate_configuration_field( - const ConfigMetadata& config_metadata); - - std::vector> user_metrics_; - - std::vector logs_; - - public: - TracerTelemetry( - bool enabled, const Clock& clock, const std::shared_ptr& logger, - const TracerSignature& tracer_signature, - const std::string& integration_name, - const std::string& integration_version, - const std::vector>& - internal_metrics, - const std::vector>& user_metrics = - std::vector>{}); - inline bool enabled() { return enabled_; } - inline bool debug() { return debug_; } - // Constructs an `app-started` message using information provided when - // constructed and the tracer_config value passed in. - std::string app_started( - const std::unordered_map& configurations); - // This is used to take a snapshot of the current state of metrics and - // collect timestamped "points" of values. These values are later submitted - // in `generate-metrics` messages. - void capture_metrics(); - void capture_configuration_change( - const std::vector& new_configuration); - // Constructs a messsage-batch containing `app-heartbeat`, and if metrics - // have been modified, a `generate-metrics` message. - std::string heartbeat_and_telemetry(); - // Constructs a message-batch containing `app-closing`, and if metrics have - // been modified, a `generate-metrics` message. - std::string app_closing(); - // Construct an `app-client-configuration-change` message. - Optional configuration_change(); - - inline void log(std::string message, telemetry::LogLevel level, - Optional stacktrace = nullopt) { - auto timestamp = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - logs_.emplace_back(telemetry::LogMessage{std::move(message), level, - stacktrace, timestamp}); - } -}; - -} // namespace tracing -} // namespace datadog From c5290ffd79ca20f1ab11ef88338b1abb3261d0ab Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Mon, 7 Apr 2025 10:22:16 +0200 Subject: [PATCH 3/4] init host info --- src/datadog/telemetry/telemetry_impl.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index bb14eb4f..40ba10fb 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -103,7 +103,8 @@ Telemetry::Telemetry(FinalizedConfiguration config, tracing::get_process_name(), ""), http_client_(client), clock_(std::move(clock)), - scheduler_(event_scheduler) { + scheduler_(event_scheduler), + host_info_(get_host_info()) { // Register all the metrics that we're tracking by adding them to the // metrics_snapshots_ container. This allows for simpler iteration logic // when using the values in `generate-metrics` messages. From baafa6e5d468471ea9aaaf7b35c75e4ddce3a8d4 Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 8 Apr 2025 13:01:56 +0200 Subject: [PATCH 4/4] code review --- src/datadog/telemetry/telemetry_impl.cpp | 3 ++- src/datadog/telemetry/telemetry_impl.h | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index 40ba10fb..184efa9d 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -93,7 +93,7 @@ nlohmann::json encode_log(const telemetry::LogMessage& log) { Telemetry::Telemetry(FinalizedConfiguration config, std::shared_ptr logger, std::shared_ptr client, - std::vector> user_metrics_, + std::vector> user_metrics, std::shared_ptr event_scheduler, HTTPClient::URL agent_url, Clock clock) : config_(std::move(config)), @@ -104,6 +104,7 @@ Telemetry::Telemetry(FinalizedConfiguration config, http_client_(client), clock_(std::move(clock)), scheduler_(event_scheduler), + user_metrics_(std::move(user_metrics)), host_info_(get_host_info()) { // Register all the metrics that we're tracking by adding them to the // metrics_snapshots_ container. This allows for simpler iteration logic diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 7240f806..4d98305a 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -61,7 +61,7 @@ class Telemetry final { Telemetry(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, + std::vector> user_metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock);