From 468c39a120cb56c342e4b424fb66283f8226c84d Mon Sep 17 00:00:00 2001 From: Damien Mehala Date: Tue, 13 May 2025 17:53:19 +0200 Subject: [PATCH] fix(curl): enforce drain to clear requests in-flight. --- src/datadog/curl.cpp | 36 +++++++++----- src/datadog/telemetry/telemetry_impl.cpp | 16 +++--- test/test_curl.cpp | 62 ++++++++++++++++++------ 3 files changed, 77 insertions(+), 37 deletions(-) diff --git a/src/datadog/curl.cpp b/src/datadog/curl.cpp index 2657f98b..72d2f72c 100644 --- a/src/datadog/curl.cpp +++ b/src/datadog/curl.cpp @@ -234,6 +234,8 @@ class CurlImpl { std::chrono::steady_clock::time_point deadline); void drain(std::chrono::steady_clock::time_point deadline); + + void clear_requests(); }; namespace { @@ -401,13 +403,32 @@ Expected CurlImpl::post( return Error{Error::CURL_REQUEST_SETUP_FAILED, curl_.easy_strerror(error)}; } +void CurlImpl::clear_requests() { + for (const auto &handle : request_handles_) { + char *user_data; + if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) == + CURLE_OK) { + delete reinterpret_cast(user_data); + } + + log_on_error(curl_.multi_remove_handle(multi_handle_, handle)); + curl_.easy_cleanup(handle); + } + + request_handles_.clear(); +} + void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) { log_on_error(curl_.multi_wakeup(multi_handle_)); std::unique_lock lock(mutex_); no_requests_.wait_until(lock, deadline, [this]() { + curl_.multi_wakeup(multi_handle_); return num_active_handles_ == 0 && new_handles_.empty(); }); + + log_on_error(curl_.multi_wakeup(multi_handle_)); + clear_requests(); } std::size_t CurlImpl::on_read_header(char *data, std::size_t, @@ -542,19 +563,8 @@ void CurlImpl::run() { max_wait_milliseconds, nullptr)); } - // We're shutting down. Clean up any remaining request handles. - for (const auto &handle : request_handles_) { - char *user_data; - if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) == - CURLE_OK) { - delete reinterpret_cast(user_data); - } - - log_on_error(curl_.multi_remove_handle(multi_handle_, handle)); - curl_.easy_cleanup(handle); - } - - request_handles_.clear(); + // We're shutting down. Clean up any remaining request handles. + clear_requests(); } void CurlImpl::handle_message(const CURLMsg &message) { diff --git a/src/datadog/telemetry/telemetry_impl.cpp b/src/datadog/telemetry/telemetry_impl.cpp index 6a84f242..71cd6405 100644 --- a/src/datadog/telemetry/telemetry_impl.cpp +++ b/src/datadog/telemetry/telemetry_impl.cpp @@ -53,6 +53,8 @@ const telemetry::Distribution request_duration{"telemetry_api.ms", "telemetry", namespace { +constexpr std::chrono::steady_clock::duration request_timeout = 2s; + HTTPClient::URL make_telemetry_endpoint(HTTPClient::URL url) { append(url.path, "/telemetry/proxy/api/v2/apmtelemetry"); return url; @@ -206,10 +208,8 @@ Telemetry::Telemetry(FinalizedConfiguration config, clock_(std::move(clock)), scheduler_(event_scheduler), host_info_(get_host_info()) { - // Callback for successful telemetry HTTP requests, to examine HTTP - // status. send_telemetry("app-started", app_started()); - http_client_->drain(clock_().tick + 2s); + http_client_->drain(clock_().tick + request_timeout); schedule_tasks(); } @@ -232,7 +232,7 @@ Telemetry::~Telemetry() { // The app-closing message is bundled with a message containing the // final metric values. send_telemetry("app-closing", app_closing()); - http_client_->drain(clock_().tick + 2s); + http_client_->drain(clock_().tick + request_timeout); } } @@ -361,10 +361,10 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) { add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"}, payload.size()); - auto post_result = - http_client_->post(telemetry_endpoint_, set_telemetry_headers, - std::move(payload), std::move(telemetry_on_response), - std::move(telemetry_on_error), clock_().tick + 5s); + auto post_result = http_client_->post( + telemetry_endpoint_, set_telemetry_headers, std::move(payload), + std::move(telemetry_on_response), std::move(telemetry_on_error), + clock_().tick + request_timeout); if (auto* error = post_result.if_error()) { increment_counter(internal_metrics::errors, {"type:network", "endpoint:agent"}); diff --git a/test/test_curl.cpp b/test/test_curl.cpp index d9c72bcc..9f90c91b 100644 --- a/test/test_curl.cpp +++ b/test/test_curl.cpp @@ -43,6 +43,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { // message to the event loop. This allows races to be explored between request // registration and `Curl` shutdown. bool delay_message_ = false; + std::function on_multi_perform = nullptr; void easy_cleanup(CURL *handle) override { destroyed_handles_.insert(handle); @@ -100,12 +101,17 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { message_.data.result = message_result_; return &message_; } + CURLMcode multi_perform(CURLM *, int *running_handles) override { if (!added_handle_) { *running_handles = 0; return CURLM_OK; } + if (on_multi_perform != nullptr) { + return on_multi_perform(); + } + // If any of these `REQUIRE`s fail, an exception will be thrown and the // test will abort. The runtime will print the exception first, though. REQUIRE(on_header_); @@ -144,7 +150,38 @@ class SingleRequestMockCurlLibrary : public CurlLibrary { } }; -TEST_CASE("parse response headers and body", "[curl]") { +#define CURL_TEST(x) TEST_CASE(x, "[curl]") + +const auto ignore = [](auto &&...) {}; + +using namespace std::chrono_literals; + +CURL_TEST("API") { + const auto clock = default_clock; + const auto logger = std::make_shared(); + SingleRequestMockCurlLibrary library; + const auto client = std::make_shared(logger, clock, library); + + SECTION("drain remove requests in-flight") { + /// Prevent to process the request. + library.on_multi_perform = [] { return CURLM_OK; }; + + const HTTPClient::URL url = {"http", "whatever", ""}; + + const auto result = client->post(url, ignore, "whatever", ignore, ignore, + clock().tick + 60min); + + REQUIRE(result); + REQUIRE(library.created_handles_.size() == 1); + REQUIRE(library.destroyed_handles_.size() == 0); + + client->drain(clock().tick + 1s); + CHECK(library.created_handles_.size() == 1); + CHECK(library.destroyed_handles_.size() == 1); + } +} + +CURL_TEST("parse response headers and body") { const auto clock = default_clock; const auto logger = std::make_shared(); SingleRequestMockCurlLibrary library; @@ -177,7 +214,7 @@ TEST_CASE("parse response headers and body", "[curl]") { std::exception_ptr exception; const HTTPClient::URL url = {"http", "whatever", ""}; const auto result = client->post( - url, [](const auto &) {}, "whatever", + url, ignore, "whatever", [&](int status, const DictReader &headers, std::string body) { try { REQUIRE(status == 200); @@ -211,7 +248,7 @@ TEST_CASE("parse response headers and body", "[curl]") { } } -TEST_CASE("bad multi-handle means error mode", "[curl]") { +CURL_TEST("bad multi-handle means error mode") { // If libcurl fails to allocate a multi-handle, then the HTTP client enters a // mode where calls to `post` always return an error. class MockCurlLibrary : public CurlLibrary { @@ -224,7 +261,6 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") { const auto client = std::make_shared(logger, clock, library); REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED); - const auto ignore = [](auto &&...) {}; const HTTPClient::URL url = {"http", "whatever", ""}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); const auto result = @@ -233,7 +269,7 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") { REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING); } -TEST_CASE("bad std::thread means error mode", "[curl]") { +CURL_TEST("bad std::thread means error mode") { // If `Curl` is unable to start its event loop thread, then it enters a mode // where calls to `post` always return an error. const auto clock = default_clock; @@ -246,7 +282,6 @@ TEST_CASE("bad std::thread means error mode", "[curl]") { }); REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED); - const auto ignore = [](auto &&...) {}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); const HTTPClient::URL url = {"http", "whatever", ""}; const auto result = @@ -255,7 +290,7 @@ TEST_CASE("bad std::thread means error mode", "[curl]") { REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING); } -TEST_CASE("fail to allocate request handle", "[curl]") { +CURL_TEST("fail to allocate request handle") { // Each call to `Curl::post` allocates a new "easy handle." If that fails, // then `post` immediately returns an error. class MockCurlLibrary : public CurlLibrary { @@ -268,7 +303,6 @@ TEST_CASE("fail to allocate request handle", "[curl]") { MockCurlLibrary library; const auto client = std::make_shared(logger, clock, library); - const auto ignore = [](auto &&...) {}; const HTTPClient::URL url = {"http", "whatever", ""}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); const auto result = @@ -277,7 +311,7 @@ TEST_CASE("fail to allocate request handle", "[curl]") { REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED); } -TEST_CASE("setopt failures", "[curl]") { +CURL_TEST("setopt failures") { // Each call to `Curl::post` allocates a new "easy handle" and sets various // options on it. Any of those setters can fail. When one does, `post` // immediately returns an error. @@ -386,7 +420,6 @@ TEST_CASE("setopt failures", "[curl]") { const auto logger = std::make_shared(); const auto client = std::make_shared(logger, clock, library); - const auto ignore = [](auto &&...) {}; HTTPClient::URL url; if (test_case.which_fails == CURLOPT_UNIX_SOCKET_PATH) { url.scheme = "unix"; @@ -404,7 +437,7 @@ TEST_CASE("setopt failures", "[curl]") { REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED); } -TEST_CASE("handles are always cleaned up", "[curl]") { +CURL_TEST("handles are always cleaned up") { const auto clock = default_clock; const auto logger = std::make_shared(); SingleRequestMockCurlLibrary library; @@ -416,7 +449,7 @@ TEST_CASE("handles are always cleaned up", "[curl]") { const HTTPClient::URL url = {"http", "whatever", ""}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); const auto result = client->post( - url, [](const auto &) {}, "whatever", + url, ignore, "whatever", [&](int status, const DictReader & /*headers*/, std::string body) { try { REQUIRE(status == 200); @@ -439,7 +472,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") { SECTION("when an error occurs") { Optional post_error; const HTTPClient::URL url = {"http", "whatever", ""}; - const auto ignore = [](auto &&...) {}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); library.message_result_ = CURLE_COULDNT_CONNECT; // any error would do const auto result = client->post( @@ -453,7 +485,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") { SECTION("when we shut down while a request is in flight") { const HTTPClient::URL url = {"http", "whatever", ""}; - const auto ignore = [](auto &&...) {}; const auto dummy_deadline = clock().tick + std::chrono::seconds(10); library.delay_message_ = true; const auto result = @@ -469,11 +500,10 @@ TEST_CASE("handles are always cleaned up", "[curl]") { REQUIRE(library.created_handles_ == library.destroyed_handles_); } -TEST_CASE("post() deadline exceeded before request start", "[curl]") { +CURL_TEST("post() deadline exceeded before request start") { const auto clock = default_clock; Curl client{std::make_shared(), clock}; - const auto ignore = [](auto &&...) {}; const HTTPClient::URL url = {"http", "whatever", ""}; const std::string body; const auto deadline = clock().tick - std::chrono::milliseconds(1);