diff --git a/BUILD.bazel b/BUILD.bazel index af291d9a..e3ea62d7 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -43,7 +43,6 @@ cc_library( "src/datadog/tags.cpp", "src/datadog/threaded_event_scheduler.cpp", "src/datadog/tracer_config.cpp", - "src/datadog/tracer_telemetry.cpp", "src/datadog/tracer.cpp", "src/datadog/trace_id.cpp", "src/datadog/trace_sampler_config.cpp", @@ -76,7 +75,6 @@ cc_library( "src/datadog/tag_propagation.h", "src/datadog/tags.h", "src/datadog/threaded_event_scheduler.h", - "src/datadog/tracer_telemetry.h", "src/datadog/trace_sampler.h", "src/datadog/w3c_propagation.h", ], diff --git a/CMakeLists.txt b/CMakeLists.txt index a8a2cae7..e807b0a0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -145,7 +145,6 @@ target_sources(dd_trace_cpp-objects src/datadog/tag_propagation.cpp src/datadog/threaded_event_scheduler.cpp src/datadog/tracer_config.cpp - src/datadog/tracer_telemetry.cpp src/datadog/tracer.cpp src/datadog/trace_id.cpp src/datadog/trace_sampler_config.cpp diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index db87e888..184efa9d 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -12,7 +12,6 @@ #include "datadog_agent.h" #include "platform_util.h" -#include "tracer_telemetry.h" using namespace datadog::tracing; using namespace std::chrono_literals; @@ -32,12 +31,69 @@ void cancel_tasks(std::vector& tasks) { tasks.clear(); } +std::string to_string(datadog::tracing::ConfigName name) { + switch (name) { + case ConfigName::SERVICE_NAME: + return "service"; + case ConfigName::SERVICE_ENV: + return "env"; + case ConfigName::SERVICE_VERSION: + return "application_version"; + case ConfigName::REPORT_TRACES: + return "trace_enabled"; + case ConfigName::TAGS: + return "trace_tags"; + case ConfigName::EXTRACTION_STYLES: + return "trace_propagation_style_extract"; + case ConfigName::INJECTION_STYLES: + return "trace_propagation_style_inject"; + case ConfigName::STARTUP_LOGS: + return "trace_startup_logs_enabled"; + case ConfigName::REPORT_TELEMETRY: + return "instrumentation_telemetry_enabled"; + case ConfigName::DELEGATE_SAMPLING: + return "DD_TRACE_DELEGATE_SAMPLING"; + case ConfigName::GENEREATE_128BIT_TRACE_IDS: + return "trace_128_bits_id_enabled"; + case ConfigName::AGENT_URL: + return "trace_agent_url"; + case ConfigName::RC_POLL_INTERVAL: + return "remote_config_poll_interval"; + case ConfigName::TRACE_SAMPLING_RATE: + return "trace_sample_rate"; + case ConfigName::TRACE_SAMPLING_LIMIT: + return "trace_rate_limit"; + case ConfigName::SPAN_SAMPLING_RULES: + return "span_sample_rules"; + case ConfigName::TRACE_SAMPLING_RULES: + return "trace_sample_rules"; + case ConfigName::TRACE_BAGGAGE_MAX_BYTES: + return "trace_baggage_max_bytes"; + case ConfigName::TRACE_BAGGAGE_MAX_ITEMS: + return "trace_baggage_max_items"; + } + + std::abort(); +} + +nlohmann::json encode_log(const telemetry::LogMessage& log) { + auto encoded = nlohmann::json{ + {"message", log.message}, + {"level", to_string(log.level)}, + {"tracer_time", log.timestamp}, + }; + if (log.stacktrace) { + encoded.emplace("stack_trace", *log.stacktrace); + } + return encoded; +} + } // namespace Telemetry::Telemetry(FinalizedConfiguration config, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, + std::vector> user_metrics, std::shared_ptr event_scheduler, HTTPClient::URL agent_url, Clock clock) : config_(std::move(config)), @@ -47,27 +103,44 @@ Telemetry::Telemetry(FinalizedConfiguration config, tracing::get_process_name(), ""), http_client_(client), clock_(std::move(clock)), - scheduler_(event_scheduler) { - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - metrics); + scheduler_(event_scheduler), + user_metrics_(std::move(user_metrics)), + host_info_(get_host_info()) { + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } // Callback for successful telemetry HTTP requests, to examine HTTP // status. @@ -100,15 +173,14 @@ void Telemetry::schedule_tasks() { if (config_.report_metrics) { tasks_.emplace_back(scheduler_->schedule_recurring_event( - config_.metrics_interval, - [this]() mutable { tracer_telemetry_->capture_metrics(); })); + config_.metrics_interval, [this]() mutable { capture_metrics(); })); } } Telemetry::~Telemetry() { if (!tasks_.empty()) { cancel_tasks(tasks_); - tracer_telemetry_->capture_metrics(); + capture_metrics(); // The app-closing message is bundled with a message containing the // final metric values. send_app_closing(); @@ -120,36 +192,54 @@ Telemetry::Telemetry(Telemetry&& rhs) : metrics_(std::move(rhs.metrics_)), config_(std::move(rhs.config_)), logger_(std::move(rhs.logger_)), - // tracer_telemetry_(std::move(rhs.tracer_telemetry_)), telemetry_on_response_(std::move(rhs.telemetry_on_response_)), telemetry_on_error_(std::move(rhs.telemetry_on_error_)), telemetry_endpoint_(std::move(rhs.telemetry_endpoint_)), tracer_signature_(std::move(rhs.tracer_signature_)), http_client_(rhs.http_client_), clock_(std::move(rhs.clock_)), - scheduler_(std::move(rhs.scheduler_)) { - cancel_tasks(rhs.tasks_); + scheduler_(std::move(rhs.scheduler_)), + user_metrics_(std::move(rhs.user_metrics_)), + seq_id_(rhs.seq_id_), + config_seq_ids_(rhs.config_seq_ids_), + host_info_(rhs.host_info_) { + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - std::vector>{}); + cancel_tasks(rhs.tasks_); schedule_tasks(); } @@ -158,7 +248,6 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(metrics_, rhs.metrics_); std::swap(config_, rhs.config_); std::swap(logger_, rhs.logger_); - std::swap(tracer_telemetry_, rhs.tracer_telemetry_); std::swap(telemetry_on_response_, rhs.telemetry_on_response_); std::swap(telemetry_on_error_, rhs.telemetry_on_error_); std::swap(telemetry_endpoint_, rhs.telemetry_endpoint_); @@ -166,52 +255,73 @@ Telemetry& Telemetry::operator=(Telemetry&& rhs) { std::swap(tracer_signature_, rhs.tracer_signature_); std::swap(http_client_, rhs.http_client_); std::swap(clock_, rhs.clock_); + std::swap(user_metrics_, rhs.user_metrics_); + std::swap(seq_id_, rhs.seq_id_); + std::swap(config_seq_ids_, rhs.config_seq_ids_); + std::swap(host_info_, rhs.host_info_); + + // Register all the metrics that we're tracking by adding them to the + // metrics_snapshots_ container. This allows for simpler iteration logic + // when using the values in `generate-metrics` messages. + metrics_snapshots_.emplace_back(metrics_.tracer.spans_created, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.spans_finished, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_created_new, + MetricSnapshot{}); + metrics_snapshots_.emplace_back( + metrics_.tracer.trace_segments_created_continued, MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.tracer.trace_segments_closed, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.requests, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_1xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_2xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_3xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_4xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.responses_5xx, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_timeout, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_network, + MetricSnapshot{}); + metrics_snapshots_.emplace_back(metrics_.trace_api.errors_status_code, + MetricSnapshot{}); + + for (auto& m : user_metrics_) { + metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); + } cancel_tasks(rhs.tasks_); - - tracer_telemetry_ = std::make_shared( - config_.enabled, clock_, logger_, tracer_signature_, - config_.integration_name, config_.integration_version, - std::vector>{ - {metrics_.tracer.spans_created}, - {metrics_.tracer.spans_finished}, - {metrics_.tracer.trace_segments_created_new}, - {metrics_.tracer.trace_segments_created_continued}, - {metrics_.tracer.trace_segments_closed}, - {metrics_.trace_api.requests}, - {metrics_.trace_api.responses_1xx}, - {metrics_.trace_api.responses_2xx}, - {metrics_.trace_api.responses_3xx}, - {metrics_.trace_api.responses_4xx}, - {metrics_.trace_api.responses_5xx}, - {metrics_.trace_api.errors_timeout}, - {metrics_.trace_api.errors_network}, - {metrics_.trace_api.errors_status_code}, - }, - std::vector>{}); schedule_tasks(); } return *this; } +// TODO(@dmehala): Move `report_logs` check in the serialization once +// `TracerTelemetry` will be removed. void Telemetry::log_error(std::string message) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::ERROR); + log(std::move(message), LogLevel::ERROR); } void Telemetry::log_error(std::string message, std::string stacktrace) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::ERROR, stacktrace); + log(std::move(message), LogLevel::ERROR, stacktrace); } void Telemetry::log_warning(std::string message) { if (!config_.report_logs) return; - tracer_telemetry_->log(std::move(message), LogLevel::WARNING); + log(std::move(message), LogLevel::WARNING); } void Telemetry::send_telemetry(StringView request_type, std::string payload) { auto set_telemetry_headers = [request_type, payload_size = payload.size(), - debug_enabled = tracer_telemetry_->debug(), + debug_enabled = config_.debug, tracer_signature = &tracer_signature_](DictWriter& headers) { /* @@ -244,27 +354,339 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) { void Telemetry::send_app_started( const std::unordered_map& config_metadata) { - send_telemetry("app-started", - tracer_telemetry_->app_started(config_metadata)); + send_telemetry("app-started", app_started(config_metadata)); } void Telemetry::send_app_closing() { - send_telemetry("app-closing", tracer_telemetry_->app_closing()); + send_telemetry("app-closing", app_closing()); } void Telemetry::send_heartbeat_and_telemetry() { - send_telemetry("app-heartbeat", tracer_telemetry_->heartbeat_and_telemetry()); + send_telemetry("app-heartbeat", heartbeat_and_telemetry()); } void Telemetry::send_configuration_change() { - if (auto payload = tracer_telemetry_->configuration_change()) { + if (auto payload = configuration_change()) { send_telemetry("app-client-configuration-change", *payload); } } +std::string Telemetry::heartbeat_and_telemetry() { + auto batch_payloads = nlohmann::json::array(); + + auto heartbeat = nlohmann::json::object({ + {"request_type", "app-heartbeat"}, + }); + batch_payloads.emplace_back(std::move(heartbeat)); + + auto metrics = nlohmann::json::array(); + for (auto& m : metrics_snapshots_) { + auto& metric = m.first.get(); + auto& points = m.second; + if (!points.empty()) { + auto type = metric.type(); + if (type == "count") { + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"points", points}, + {"namespace", metric.scope()}, + {"common", metric.common()}, + })); + } else if (type == "gauge") { + // gauge metrics have a interval + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"namespace", metric.scope()}, + {"interval", 10}, + {"points", points}, + {"common", metric.common()}, + })); + } + } + points.clear(); + } + + if (!metrics.empty()) { + auto generate_metrics = nlohmann::json::object({ + {"request_type", "generate-metrics"}, + {"payload", nlohmann::json::object({ + {"series", metrics}, + })}, + }); + batch_payloads.emplace_back(std::move(generate_metrics)); + } + + if (!logs_.empty()) { + auto encoded_logs = nlohmann::json::array(); + for (const auto& log : logs_) { + auto encoded = encode_log(log); + encoded_logs.emplace_back(std::move(encoded)); + } + + assert(!encoded_logs.empty()); + + auto logs_payload = nlohmann::json::object({ + {"request_type", "logs"}, + {"payload", + nlohmann::json{ + {"logs", encoded_logs}, + }}, + }); + + batch_payloads.emplace_back(std::move(logs_payload)); + } + + auto telemetry_body = generate_telemetry_body("message-batch"); + telemetry_body["payload"] = batch_payloads; + auto message_batch_payload = telemetry_body.dump(); + + return message_batch_payload; +} + +std::string Telemetry::app_closing() { + auto batch_payloads = nlohmann::json::array(); + + auto app_closing = nlohmann::json::object({ + {"request_type", "app-closing"}, + }); + batch_payloads.emplace_back(std::move(app_closing)); + + auto metrics = nlohmann::json::array(); + for (auto& m : metrics_snapshots_) { + auto& metric = m.first.get(); + auto& points = m.second; + if (!points.empty()) { + auto type = metric.type(); + if (type == "count") { + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"points", points}, + {"common", metric.common()}, + {"namespace", metric.scope()}, + })); + } else if (type == "gauge") { + // gauge metrics have a interval + metrics.emplace_back(nlohmann::json::object({ + {"metric", metric.name()}, + {"tags", metric.tags()}, + {"type", metric.type()}, + {"interval", 10}, + {"points", points}, + {"common", metric.common()}, + {"namespace", metric.scope()}, + })); + } + } + points.clear(); + } + + if (!metrics.empty()) { + auto generate_metrics = nlohmann::json::object({ + {"request_type", "generate-metrics"}, + {"payload", nlohmann::json::object({ + {"series", metrics}, + })}, + }); + batch_payloads.emplace_back(std::move(generate_metrics)); + } + + if (!logs_.empty()) { + auto encoded_logs = nlohmann::json::array(); + for (const auto& log : logs_) { + auto encoded = encode_log(log); + encoded_logs.emplace_back(std::move(encoded)); + } + + assert(!encoded_logs.empty()); + + auto logs_payload = nlohmann::json::object({ + {"request_type", "logs"}, + {"payload", + nlohmann::json{ + {"logs", encoded_logs}, + }}, + }); + + batch_payloads.emplace_back(std::move(logs_payload)); + } + + auto telemetry_body = generate_telemetry_body("message-batch"); + telemetry_body["payload"] = batch_payloads; + auto message_batch_payload = telemetry_body.dump(); + + return message_batch_payload; +} + +std::string Telemetry::app_started( + const std::unordered_map& configurations) { + auto configuration_json = nlohmann::json::array(); + for (const auto& [_, config_metadata] : configurations) { + // if (config_metadata.value.empty()) continue; + + configuration_json.emplace_back( + generate_configuration_field(config_metadata)); + } + + // clang-format off + auto app_started_msg = nlohmann::json{ + {"request_type", "app-started"}, + {"payload", nlohmann::json{ + {"configuration", configuration_json} + }} + }; + + auto batch = generate_telemetry_body("message-batch"); + batch["payload"] = nlohmann::json::array({ + std::move(app_started_msg) + }); + // clang-format on + + if (!config_.integration_name.empty()) { + // clang-format off + auto integration_msg = nlohmann::json{ + {"request_type", "app-integrations-change"}, + {"payload", nlohmann::json{ + {"integrations", nlohmann::json::array({ + nlohmann::json{ + {"name", config_.integration_name}, + {"version", config_.integration_version}, + {"enabled", true} + } + })} + }} + }; + // clang-format on + + batch["payload"].emplace_back(std::move(integration_msg)); + } + + return batch.dump(); +} + +Optional Telemetry::configuration_change() { + if (configuration_snapshot_.empty()) return nullopt; + + std::vector current_configuration; + std::swap(current_configuration, configuration_snapshot_); + + auto configuration_json = nlohmann::json::array(); + for (const auto& config_metadata : current_configuration) { + configuration_json.emplace_back( + generate_configuration_field(config_metadata)); + } + + auto telemetry_body = + generate_telemetry_body("app-client-configuration-change"); + telemetry_body["payload"] = + nlohmann::json{{"configuration", configuration_json}}; + + return telemetry_body.dump(); +} + +nlohmann::json Telemetry::generate_telemetry_body(std::string request_type) { + std::time_t tracer_time = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + seq_id_++; + return nlohmann::json::object({ + {"api_version", "v2"}, + {"seq_id", seq_id_}, + {"request_type", request_type}, + {"tracer_time", tracer_time}, + {"runtime_id", tracer_signature_.runtime_id.string()}, + {"debug", config_.debug}, + {"application", + nlohmann::json::object({ + {"service_name", tracer_signature_.default_service}, + {"env", tracer_signature_.default_environment}, + {"tracer_version", tracer_signature_.library_version}, + {"language_name", tracer_signature_.library_language}, + {"language_version", tracer_signature_.library_language_version}, + })}, + {"host", + { + {"hostname", host_info_.hostname}, + {"os", host_info_.os}, + {"os_version", host_info_.os_version}, + {"architecture", host_info_.cpu_architecture}, + {"kernel_name", host_info_.kernel_name}, + {"kernel_version", host_info_.kernel_version}, + {"kernel_release", host_info_.kernel_release}, + }}, + }); +} + +nlohmann::json Telemetry::generate_configuration_field( + const ConfigMetadata& config_metadata) { + // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can + // detect between non set fields. + config_seq_ids_[config_metadata.name] += 1; + auto seq_id = config_seq_ids_[config_metadata.name]; + + auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, + {"value", config_metadata.value}, + {"seq_id", seq_id}}; + + switch (config_metadata.origin) { + case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: + j["origin"] = "env_var"; + break; + case ConfigMetadata::Origin::CODE: + j["origin"] = "code"; + break; + case ConfigMetadata::Origin::REMOTE_CONFIG: + j["origin"] = "remote_config"; + break; + case ConfigMetadata::Origin::DEFAULT: + j["origin"] = "default"; + break; + } + + if (config_metadata.error) { + // clang-format off + j["error"] = { + {"code", config_metadata.error->code}, + {"message", config_metadata.error->message} + }; + // clang-format on + } + + return j; +} + void Telemetry::capture_configuration_change( const std::vector& new_configuration) { - return tracer_telemetry_->capture_configuration_change(new_configuration); + configuration_snapshot_.insert(configuration_snapshot_.begin(), + new_configuration.begin(), + new_configuration.end()); +} + +void Telemetry::capture_metrics() { + std::time_t timepoint = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + for (auto& m : metrics_snapshots_) { + auto value = m.first.get().capture_and_reset_value(); + if (value == 0) { + continue; + } + m.second.emplace_back(timepoint, value); + } +} + +void Telemetry::log(std::string message, telemetry::LogLevel level, + Optional stacktrace) { + auto timestamp = std::chrono::duration_cast( + clock_().wall.time_since_epoch()) + .count(); + logs_.emplace_back( + telemetry::LogMessage{std::move(message), level, stacktrace, timestamp}); } } // namespace datadog::telemetry diff --git a/src/datadog/telemetry/telemetry_impl.h b/src/datadog/telemetry/telemetry_impl.h index 4c2f24ae..4d98305a 100644 --- a/src/datadog/telemetry/telemetry_impl.h +++ b/src/datadog/telemetry/telemetry_impl.h @@ -1,11 +1,17 @@ #pragma once +#include +#include #include #include #include #include +#include +#include -#include "tracer_telemetry.h" +#include "json.hpp" +#include "log.h" +#include "platform_util.h" namespace datadog::telemetry { @@ -17,7 +23,6 @@ class Telemetry final { FinalizedConfiguration config_; /// Shared pointer to the user logger instance. std::shared_ptr logger_; - std::shared_ptr tracer_telemetry_; std::vector tasks_; tracing::HTTPClient::ResponseHandler telemetry_on_response_; tracing::HTTPClient::ErrorHandler telemetry_on_error_; @@ -27,6 +32,26 @@ class Telemetry final { tracing::Clock clock_; std::shared_ptr scheduler_; + // This uses a reference_wrapper so references to internal metric values can + // be captured, and be iterated trivially when the values need to be + // snapshotted and published in telemetry messages. + using MetricSnapshot = std::vector>; + std::vector< + std::pair, MetricSnapshot>> + metrics_snapshots_; + std::vector> user_metrics_; + + std::vector configuration_snapshot_; + + std::vector logs_; + + // Track sequence id per payload generated + uint64_t seq_id_ = 0; + // Track sequence id per configuration field + std::unordered_map config_seq_ids_; + + tracing::HostInfo host_info_; + public: /// Constructor for the Telemetry class /// @@ -36,7 +61,7 @@ class Telemetry final { Telemetry(FinalizedConfiguration configuration, std::shared_ptr logger, std::shared_ptr client, - std::vector> metrics, + std::vector> user_metrics, std::shared_ptr event_scheduler, tracing::HTTPClient::URL agent_url, tracing::Clock clock = tracing::default_clock); @@ -81,6 +106,29 @@ class Telemetry final { void send_heartbeat_and_telemetry(); void schedule_tasks(); + + void capture_metrics(); + + void log(std::string message, telemetry::LogLevel level, + tracing::Optional stacktrace = tracing::nullopt); + + tracing::Optional configuration_change(); + + nlohmann::json generate_telemetry_body(std::string request_type); + nlohmann::json generate_configuration_field( + const tracing::ConfigMetadata& config_metadata); + + // Constructs an `app-started` message using information provided when + // constructed and the tracer_config value passed in. + std::string app_started( + const std::unordered_map& + configurations); + // Constructs a messsage-batch containing `app-heartbeat`, and if metrics + // have been modified, a `generate-metrics` message. + std::string heartbeat_and_telemetry(); + // Constructs a message-batch containing `app-closing`, and if metrics have + // been modified, a `generate-metrics` message. + std::string app_closing(); }; } // namespace datadog::telemetry diff --git a/src/datadog/tracer_telemetry.cpp b/src/datadog/tracer_telemetry.cpp deleted file mode 100644 index 487fd2f6..00000000 --- a/src/datadog/tracer_telemetry.cpp +++ /dev/null @@ -1,411 +0,0 @@ -#include "tracer_telemetry.h" - -#include -#include -#include - -#include "platform_util.h" - -namespace datadog { -namespace tracing { -namespace { - -std::string to_string(datadog::tracing::ConfigName name) { - switch (name) { - case ConfigName::SERVICE_NAME: - return "service"; - case ConfigName::SERVICE_ENV: - return "env"; - case ConfigName::SERVICE_VERSION: - return "application_version"; - case ConfigName::REPORT_TRACES: - return "trace_enabled"; - case ConfigName::TAGS: - return "trace_tags"; - case ConfigName::EXTRACTION_STYLES: - return "trace_propagation_style_extract"; - case ConfigName::INJECTION_STYLES: - return "trace_propagation_style_inject"; - case ConfigName::STARTUP_LOGS: - return "trace_startup_logs_enabled"; - case ConfigName::REPORT_TELEMETRY: - return "instrumentation_telemetry_enabled"; - case ConfigName::DELEGATE_SAMPLING: - return "DD_TRACE_DELEGATE_SAMPLING"; - case ConfigName::GENEREATE_128BIT_TRACE_IDS: - return "trace_128_bits_id_enabled"; - case ConfigName::AGENT_URL: - return "trace_agent_url"; - case ConfigName::RC_POLL_INTERVAL: - return "remote_config_poll_interval"; - case ConfigName::TRACE_SAMPLING_RATE: - return "trace_sample_rate"; - case ConfigName::TRACE_SAMPLING_LIMIT: - return "trace_rate_limit"; - case ConfigName::SPAN_SAMPLING_RULES: - return "span_sample_rules"; - case ConfigName::TRACE_SAMPLING_RULES: - return "trace_sample_rules"; - case ConfigName::TRACE_BAGGAGE_MAX_BYTES: - return "trace_baggage_max_bytes"; - case ConfigName::TRACE_BAGGAGE_MAX_ITEMS: - return "trace_baggage_max_items"; - } - - std::abort(); -} - -nlohmann::json encode_log(const telemetry::LogMessage& log) { - auto encoded = nlohmann::json{ - {"message", log.message}, - {"level", to_string(log.level)}, - {"tracer_time", log.timestamp}, - }; - if (log.stacktrace) { - encoded.emplace("stack_trace", *log.stacktrace); - } - return encoded; -} - -} // namespace - -TracerTelemetry::TracerTelemetry( - bool enabled, const Clock& clock, const std::shared_ptr& logger, - const TracerSignature& tracer_signature, - const std::string& integration_name, const std::string& integration_version, - const std::vector>& - internal_metrics, - const std::vector>& user_metrics) - : enabled_(enabled), - clock_(clock), - logger_(logger), - host_info_(get_host_info()), - tracer_signature_(tracer_signature), - integration_name_(integration_name), - integration_version_(integration_version), - user_metrics_(user_metrics) { - if (enabled_) { - // Register all the metrics that we're tracking by adding them to the - // metrics_snapshots_ container. This allows for simpler iteration logic - // when using the values in `generate-metrics` messages. - for (auto& m : internal_metrics) { - metrics_snapshots_.emplace_back(m, MetricSnapshot{}); - } - for (auto& m : user_metrics_) { - metrics_snapshots_.emplace_back(*m, MetricSnapshot{}); - } - } -} - -nlohmann::json TracerTelemetry::generate_telemetry_body( - std::string request_type) { - std::time_t tracer_time = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - seq_id_++; - return nlohmann::json::object({ - {"api_version", "v2"}, - {"seq_id", seq_id_}, - {"request_type", request_type}, - {"tracer_time", tracer_time}, - {"runtime_id", tracer_signature_.runtime_id.string()}, - {"debug", debug_}, - {"application", - nlohmann::json::object({ - {"service_name", tracer_signature_.default_service}, - {"env", tracer_signature_.default_environment}, - {"tracer_version", tracer_signature_.library_version}, - {"language_name", tracer_signature_.library_language}, - {"language_version", tracer_signature_.library_language_version}, - })}, - {"host", - { - {"hostname", host_info_.hostname}, - {"os", host_info_.os}, - {"os_version", host_info_.os_version}, - {"architecture", host_info_.cpu_architecture}, - {"kernel_name", host_info_.kernel_name}, - {"kernel_version", host_info_.kernel_version}, - {"kernel_release", host_info_.kernel_release}, - }}, - }); -} - -nlohmann::json TracerTelemetry::generate_configuration_field( - const ConfigMetadata& config_metadata) { - // NOTE(@dmehala): `seq_id` should start at 1 so that the go backend can - // detect between non set fields. - config_seq_ids[config_metadata.name] += 1; - auto seq_id = config_seq_ids[config_metadata.name]; - - auto j = nlohmann::json{{"name", to_string(config_metadata.name)}, - {"value", config_metadata.value}, - {"seq_id", seq_id}}; - - switch (config_metadata.origin) { - case ConfigMetadata::Origin::ENVIRONMENT_VARIABLE: - j["origin"] = "env_var"; - break; - case ConfigMetadata::Origin::CODE: - j["origin"] = "code"; - break; - case ConfigMetadata::Origin::REMOTE_CONFIG: - j["origin"] = "remote_config"; - break; - case ConfigMetadata::Origin::DEFAULT: - j["origin"] = "default"; - break; - } - - if (config_metadata.error) { - // clang-format off - j["error"] = { - {"code", config_metadata.error->code}, - {"message", config_metadata.error->message} - }; - // clang-format on - } - - return j; -} - -std::string TracerTelemetry::app_started( - const std::unordered_map& configurations) { - auto configuration_json = nlohmann::json::array(); - for (const auto& [_, config_metadata] : configurations) { - // if (config_metadata.value.empty()) continue; - - configuration_json.emplace_back( - generate_configuration_field(config_metadata)); - } - - // clang-format off - auto app_started_msg = nlohmann::json{ - {"request_type", "app-started"}, - {"payload", nlohmann::json{ - {"configuration", configuration_json} - }} - }; - - auto batch = generate_telemetry_body("message-batch"); - batch["payload"] = nlohmann::json::array({ - std::move(app_started_msg) - }); - // clang-format on - - if (!integration_name_.empty()) { - // clang-format off - auto integration_msg = nlohmann::json{ - {"request_type", "app-integrations-change"}, - {"payload", nlohmann::json{ - {"integrations", nlohmann::json::array({ - nlohmann::json{ - {"name", integration_name_}, - {"version", integration_version_}, - {"enabled", true} - } - })} - }} - }; - // clang-format on - - batch["payload"].emplace_back(std::move(integration_msg)); - } - - return batch.dump(); -} - -void TracerTelemetry::capture_metrics() { - std::time_t timepoint = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - for (auto& m : metrics_snapshots_) { - auto value = m.first.get().capture_and_reset_value(); - if (value == 0) { - continue; - } - m.second.emplace_back(timepoint, value); - } -} - -void TracerTelemetry::capture_configuration_change( - const std::vector& new_configuration) { - configuration_snapshot_.insert(configuration_snapshot_.begin(), - new_configuration.begin(), - new_configuration.end()); -} - -std::string TracerTelemetry::heartbeat_and_telemetry() { - auto batch_payloads = nlohmann::json::array(); - - auto heartbeat = nlohmann::json::object({ - {"request_type", "app-heartbeat"}, - }); - batch_payloads.emplace_back(std::move(heartbeat)); - - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"namespace", metric.scope()}, - {"common", metric.common()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"namespace", metric.scope()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - })); - } - } - points.clear(); - } - - if (!metrics.empty()) { - auto generate_metrics = nlohmann::json::object({ - {"request_type", "generate-metrics"}, - {"payload", nlohmann::json::object({ - {"series", metrics}, - })}, - }); - batch_payloads.emplace_back(std::move(generate_metrics)); - } - - if (!logs_.empty()) { - auto encoded_logs = nlohmann::json::array(); - for (const auto& log : logs_) { - auto encoded = encode_log(log); - encoded_logs.emplace_back(std::move(encoded)); - } - - assert(!encoded_logs.empty()); - - auto logs_payload = nlohmann::json::object({ - {"request_type", "logs"}, - {"payload", - nlohmann::json{ - {"logs", encoded_logs}, - }}, - }); - - batch_payloads.emplace_back(std::move(logs_payload)); - } - - auto telemetry_body = generate_telemetry_body("message-batch"); - telemetry_body["payload"] = batch_payloads; - auto message_batch_payload = telemetry_body.dump(); - - return message_batch_payload; -} - -std::string TracerTelemetry::app_closing() { - auto batch_payloads = nlohmann::json::array(); - - auto app_closing = nlohmann::json::object({ - {"request_type", "app-closing"}, - }); - batch_payloads.emplace_back(std::move(app_closing)); - - auto metrics = nlohmann::json::array(); - for (auto& m : metrics_snapshots_) { - auto& metric = m.first.get(); - auto& points = m.second; - if (!points.empty()) { - auto type = metric.type(); - if (type == "count") { - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } else if (type == "gauge") { - // gauge metrics have a interval - metrics.emplace_back(nlohmann::json::object({ - {"metric", metric.name()}, - {"tags", metric.tags()}, - {"type", metric.type()}, - {"interval", 10}, - {"points", points}, - {"common", metric.common()}, - {"namespace", metric.scope()}, - })); - } - } - points.clear(); - } - - if (!metrics.empty()) { - auto generate_metrics = nlohmann::json::object({ - {"request_type", "generate-metrics"}, - {"payload", nlohmann::json::object({ - {"series", metrics}, - })}, - }); - batch_payloads.emplace_back(std::move(generate_metrics)); - } - - if (!logs_.empty()) { - auto encoded_logs = nlohmann::json::array(); - for (const auto& log : logs_) { - auto encoded = encode_log(log); - encoded_logs.emplace_back(std::move(encoded)); - } - - assert(!encoded_logs.empty()); - - auto logs_payload = nlohmann::json::object({ - {"request_type", "logs"}, - {"payload", - nlohmann::json{ - {"logs", encoded_logs}, - }}, - }); - - batch_payloads.emplace_back(std::move(logs_payload)); - } - - auto telemetry_body = generate_telemetry_body("message-batch"); - telemetry_body["payload"] = batch_payloads; - auto message_batch_payload = telemetry_body.dump(); - - return message_batch_payload; -} - -Optional TracerTelemetry::configuration_change() { - if (configuration_snapshot_.empty()) return nullopt; - - std::vector current_configuration; - std::swap(current_configuration, configuration_snapshot_); - - auto configuration_json = nlohmann::json::array(); - for (const auto& config_metadata : current_configuration) { - configuration_json.emplace_back( - generate_configuration_field(config_metadata)); - } - - auto telemetry_body = - generate_telemetry_body("app-client-configuration-change"); - telemetry_body["payload"] = - nlohmann::json{{"configuration", configuration_json}}; - - return telemetry_body.dump(); -} - -} // namespace tracing -} // namespace datadog diff --git a/src/datadog/tracer_telemetry.h b/src/datadog/tracer_telemetry.h deleted file mode 100644 index e1ebc94e..00000000 --- a/src/datadog/tracer_telemetry.h +++ /dev/null @@ -1,125 +0,0 @@ -#pragma once - -// This component provides a class, TracerTelemetry, that is used to collect -// data from the activity of the tracer implementation, and encode messages that -// can be submitted to the Datadog Agent. -// -// Counter metrics are updated in other parts of the tracers, with the values -// being managed by this class. -// -// The messages that TracerTelemetry produces are -// - `app-started` -// - `message-batch` -// - `app-heartbeat` -// - `generate-metrics` -// - `app-closing` -// - `app-client-configuration-change` -// -// `app-started` messages are sent as part of initializing the tracer. -// -// At 60 second intervals, a `message-batch` message is sent containing an -// `app-heartbeat` message, and if metrics have changed during that interval, a -// `generate-metrics` message is also included in the batch. -// -// `app-closing` messages are sent as part of terminating the tracer. These are -// sent as a `message-batch` message , and if metrics have changed since the -// last `app-heartbeat` event, a `generate-metrics` message is also included in -// the batch. -// -// `app-client-configuration-change` messages are sent as soon as the tracer -// configuration has been updated by a Remote Configuration event. -#include -#include -#include -#include -#include - -#include - -#include "json.hpp" -#include "platform_util.h" -#include "telemetry/log.h" - -namespace datadog { -namespace tracing { - -class Logger; -struct SpanDefaults; - -class TracerTelemetry { - bool enabled_ = false; - bool debug_ = true; - Clock clock_; - std::shared_ptr logger_; - HostInfo host_info_; - TracerSignature tracer_signature_; - std::string integration_name_; - std::string integration_version_; - // Track sequence id per payload generated - uint64_t seq_id_ = 0; - // Track sequence id per configuration field - std::unordered_map config_seq_ids; - // Each metric has an associated MetricSnapshot that contains the data points, - // represented as a timestamp and the value of that metric. - using MetricSnapshot = std::vector>; - // This uses a reference_wrapper so references to internal metric values can - // be captured, and be iterated trivially when the values need to be - // snapshotted and published in telemetry messages. - std::vector< - std::pair, MetricSnapshot>> - metrics_snapshots_; - - std::vector configuration_snapshot_; - - nlohmann::json generate_telemetry_body(std::string request_type); - - nlohmann::json generate_configuration_field( - const ConfigMetadata& config_metadata); - - std::vector> user_metrics_; - - std::vector logs_; - - public: - TracerTelemetry( - bool enabled, const Clock& clock, const std::shared_ptr& logger, - const TracerSignature& tracer_signature, - const std::string& integration_name, - const std::string& integration_version, - const std::vector>& - internal_metrics, - const std::vector>& user_metrics = - std::vector>{}); - inline bool enabled() { return enabled_; } - inline bool debug() { return debug_; } - // Constructs an `app-started` message using information provided when - // constructed and the tracer_config value passed in. - std::string app_started( - const std::unordered_map& configurations); - // This is used to take a snapshot of the current state of metrics and - // collect timestamped "points" of values. These values are later submitted - // in `generate-metrics` messages. - void capture_metrics(); - void capture_configuration_change( - const std::vector& new_configuration); - // Constructs a messsage-batch containing `app-heartbeat`, and if metrics - // have been modified, a `generate-metrics` message. - std::string heartbeat_and_telemetry(); - // Constructs a message-batch containing `app-closing`, and if metrics have - // been modified, a `generate-metrics` message. - std::string app_closing(); - // Construct an `app-client-configuration-change` message. - Optional configuration_change(); - - inline void log(std::string message, telemetry::LogLevel level, - Optional stacktrace = nullopt) { - auto timestamp = std::chrono::duration_cast( - clock_().wall.time_since_epoch()) - .count(); - logs_.emplace_back(telemetry::LogMessage{std::move(message), level, - stacktrace, timestamp}); - } -}; - -} // namespace tracing -} // namespace datadog