Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 23 additions & 13 deletions src/datadog/curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ class CurlImpl {
std::chrono::steady_clock::time_point deadline);

void drain(std::chrono::steady_clock::time_point deadline);

void clear_requests();
};

namespace {
Expand Down Expand Up @@ -401,13 +403,32 @@ Expected<void> CurlImpl::post(
return Error{Error::CURL_REQUEST_SETUP_FAILED, curl_.easy_strerror(error)};
}

void CurlImpl::clear_requests() {
for (const auto &handle : request_handles_) {
char *user_data;
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) ==
CURLE_OK) {
delete reinterpret_cast<Request *>(user_data);
}

log_on_error(curl_.multi_remove_handle(multi_handle_, handle));
curl_.easy_cleanup(handle);
}

request_handles_.clear();
}

void CurlImpl::drain(std::chrono::steady_clock::time_point deadline) {
log_on_error(curl_.multi_wakeup(multi_handle_));

std::unique_lock<std::mutex> lock(mutex_);
no_requests_.wait_until(lock, deadline, [this]() {
curl_.multi_wakeup(multi_handle_);
return num_active_handles_ == 0 && new_handles_.empty();
});

log_on_error(curl_.multi_wakeup(multi_handle_));
clear_requests();
}

std::size_t CurlImpl::on_read_header(char *data, std::size_t,
Expand Down Expand Up @@ -542,19 +563,8 @@ void CurlImpl::run() {
max_wait_milliseconds, nullptr));
}

// We're shutting down. Clean up any remaining request handles.
for (const auto &handle : request_handles_) {
char *user_data;
if (log_on_error(curl_.easy_getinfo_private(handle, &user_data)) ==
CURLE_OK) {
delete reinterpret_cast<Request *>(user_data);
}

log_on_error(curl_.multi_remove_handle(multi_handle_, handle));
curl_.easy_cleanup(handle);
}

request_handles_.clear();
// We're shutting down. Clean up any remaining request handles.
clear_requests();
}

void CurlImpl::handle_message(const CURLMsg &message) {
Expand Down
16 changes: 8 additions & 8 deletions src/datadog/telemetry/telemetry_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const telemetry::Distribution request_duration{"telemetry_api.ms", "telemetry",

namespace {

constexpr std::chrono::steady_clock::duration request_timeout = 2s;

HTTPClient::URL make_telemetry_endpoint(HTTPClient::URL url) {
append(url.path, "/telemetry/proxy/api/v2/apmtelemetry");
return url;
Expand Down Expand Up @@ -206,10 +208,8 @@ Telemetry::Telemetry(FinalizedConfiguration config,
clock_(std::move(clock)),
scheduler_(event_scheduler),
host_info_(get_host_info()) {
// Callback for successful telemetry HTTP requests, to examine HTTP
// status.
send_telemetry("app-started", app_started());
http_client_->drain(clock_().tick + 2s);
http_client_->drain(clock_().tick + request_timeout);
schedule_tasks();
}

Expand All @@ -232,7 +232,7 @@ Telemetry::~Telemetry() {
// The app-closing message is bundled with a message containing the
// final metric values.
send_telemetry("app-closing", app_closing());
http_client_->drain(clock_().tick + 2s);
http_client_->drain(clock_().tick + request_timeout);
}
}

Expand Down Expand Up @@ -361,10 +361,10 @@ void Telemetry::send_telemetry(StringView request_type, std::string payload) {
add_datapoint(internal_metrics::bytes_sent, {"endpoint:agent"},
payload.size());

auto post_result =
http_client_->post(telemetry_endpoint_, set_telemetry_headers,
std::move(payload), std::move(telemetry_on_response),
std::move(telemetry_on_error), clock_().tick + 5s);
auto post_result = http_client_->post(
telemetry_endpoint_, set_telemetry_headers, std::move(payload),
std::move(telemetry_on_response), std::move(telemetry_on_error),
clock_().tick + request_timeout);
if (auto* error = post_result.if_error()) {
increment_counter(internal_metrics::errors,
{"type:network", "endpoint:agent"});
Expand Down
62 changes: 46 additions & 16 deletions test/test_curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
// message to the event loop. This allows races to be explored between request
// registration and `Curl` shutdown.
bool delay_message_ = false;
std::function<CURLMcode()> on_multi_perform = nullptr;

void easy_cleanup(CURL *handle) override {
destroyed_handles_.insert(handle);
Expand Down Expand Up @@ -100,12 +101,17 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
message_.data.result = message_result_;
return &message_;
}

CURLMcode multi_perform(CURLM *, int *running_handles) override {
if (!added_handle_) {
*running_handles = 0;
return CURLM_OK;
}

if (on_multi_perform != nullptr) {
return on_multi_perform();
}

// If any of these `REQUIRE`s fail, an exception will be thrown and the
// test will abort. The runtime will print the exception first, though.
REQUIRE(on_header_);
Expand Down Expand Up @@ -144,7 +150,38 @@ class SingleRequestMockCurlLibrary : public CurlLibrary {
}
};

TEST_CASE("parse response headers and body", "[curl]") {
#define CURL_TEST(x) TEST_CASE(x, "[curl]")

const auto ignore = [](auto &&...) {};

using namespace std::chrono_literals;

CURL_TEST("API") {
const auto clock = default_clock;
const auto logger = std::make_shared<MockLogger>();
SingleRequestMockCurlLibrary library;
const auto client = std::make_shared<Curl>(logger, clock, library);

SECTION("drain remove requests in-flight") {
/// Prevent to process the request.
library.on_multi_perform = [] { return CURLM_OK; };

const HTTPClient::URL url = {"http", "whatever", ""};

const auto result = client->post(url, ignore, "whatever", ignore, ignore,
clock().tick + 60min);

REQUIRE(result);
REQUIRE(library.created_handles_.size() == 1);
REQUIRE(library.destroyed_handles_.size() == 0);

client->drain(clock().tick + 1s);
CHECK(library.created_handles_.size() == 1);
CHECK(library.destroyed_handles_.size() == 1);
}
}

CURL_TEST("parse response headers and body") {
const auto clock = default_clock;
const auto logger = std::make_shared<MockLogger>();
SingleRequestMockCurlLibrary library;
Expand Down Expand Up @@ -177,7 +214,7 @@ TEST_CASE("parse response headers and body", "[curl]") {
std::exception_ptr exception;
const HTTPClient::URL url = {"http", "whatever", ""};
const auto result = client->post(
url, [](const auto &) {}, "whatever",
url, ignore, "whatever",
[&](int status, const DictReader &headers, std::string body) {
try {
REQUIRE(status == 200);
Expand Down Expand Up @@ -211,7 +248,7 @@ TEST_CASE("parse response headers and body", "[curl]") {
}
}

TEST_CASE("bad multi-handle means error mode", "[curl]") {
CURL_TEST("bad multi-handle means error mode") {
// If libcurl fails to allocate a multi-handle, then the HTTP client enters a
// mode where calls to `post` always return an error.
class MockCurlLibrary : public CurlLibrary {
Expand All @@ -224,7 +261,6 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") {
const auto client = std::make_shared<Curl>(logger, clock, library);
REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED);

const auto ignore = [](auto &&...) {};
const HTTPClient::URL url = {"http", "whatever", ""};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
const auto result =
Expand All @@ -233,7 +269,7 @@ TEST_CASE("bad multi-handle means error mode", "[curl]") {
REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING);
}

TEST_CASE("bad std::thread means error mode", "[curl]") {
CURL_TEST("bad std::thread means error mode") {
// If `Curl` is unable to start its event loop thread, then it enters a mode
// where calls to `post` always return an error.
const auto clock = default_clock;
Expand All @@ -246,7 +282,6 @@ TEST_CASE("bad std::thread means error mode", "[curl]") {
});
REQUIRE(logger->first_error().code == Error::CURL_HTTP_CLIENT_SETUP_FAILED);

const auto ignore = [](auto &&...) {};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
const HTTPClient::URL url = {"http", "whatever", ""};
const auto result =
Expand All @@ -255,7 +290,7 @@ TEST_CASE("bad std::thread means error mode", "[curl]") {
REQUIRE(result.error().code == Error::CURL_HTTP_CLIENT_NOT_RUNNING);
}

TEST_CASE("fail to allocate request handle", "[curl]") {
CURL_TEST("fail to allocate request handle") {
// Each call to `Curl::post` allocates a new "easy handle." If that fails,
// then `post` immediately returns an error.
class MockCurlLibrary : public CurlLibrary {
Expand All @@ -268,7 +303,6 @@ TEST_CASE("fail to allocate request handle", "[curl]") {
MockCurlLibrary library;
const auto client = std::make_shared<Curl>(logger, clock, library);

const auto ignore = [](auto &&...) {};
const HTTPClient::URL url = {"http", "whatever", ""};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
const auto result =
Expand All @@ -277,7 +311,7 @@ TEST_CASE("fail to allocate request handle", "[curl]") {
REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED);
}

TEST_CASE("setopt failures", "[curl]") {
CURL_TEST("setopt failures") {
// Each call to `Curl::post` allocates a new "easy handle" and sets various
// options on it. Any of those setters can fail. When one does, `post`
// immediately returns an error.
Expand Down Expand Up @@ -386,7 +420,6 @@ TEST_CASE("setopt failures", "[curl]") {
const auto logger = std::make_shared<NullLogger>();
const auto client = std::make_shared<Curl>(logger, clock, library);

const auto ignore = [](auto &&...) {};
HTTPClient::URL url;
if (test_case.which_fails == CURLOPT_UNIX_SOCKET_PATH) {
url.scheme = "unix";
Expand All @@ -404,7 +437,7 @@ TEST_CASE("setopt failures", "[curl]") {
REQUIRE(result.error().code == Error::CURL_REQUEST_SETUP_FAILED);
}

TEST_CASE("handles are always cleaned up", "[curl]") {
CURL_TEST("handles are always cleaned up") {
const auto clock = default_clock;
const auto logger = std::make_shared<MockLogger>();
SingleRequestMockCurlLibrary library;
Expand All @@ -416,7 +449,7 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
const HTTPClient::URL url = {"http", "whatever", ""};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
const auto result = client->post(
url, [](const auto &) {}, "whatever",
url, ignore, "whatever",
[&](int status, const DictReader & /*headers*/, std::string body) {
try {
REQUIRE(status == 200);
Expand All @@ -439,7 +472,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
SECTION("when an error occurs") {
Optional<Error> post_error;
const HTTPClient::URL url = {"http", "whatever", ""};
const auto ignore = [](auto &&...) {};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
library.message_result_ = CURLE_COULDNT_CONNECT; // any error would do
const auto result = client->post(
Expand All @@ -453,7 +485,6 @@ TEST_CASE("handles are always cleaned up", "[curl]") {

SECTION("when we shut down while a request is in flight") {
const HTTPClient::URL url = {"http", "whatever", ""};
const auto ignore = [](auto &&...) {};
const auto dummy_deadline = clock().tick + std::chrono::seconds(10);
library.delay_message_ = true;
const auto result =
Expand All @@ -469,11 +500,10 @@ TEST_CASE("handles are always cleaned up", "[curl]") {
REQUIRE(library.created_handles_ == library.destroyed_handles_);
}

TEST_CASE("post() deadline exceeded before request start", "[curl]") {
CURL_TEST("post() deadline exceeded before request start") {
const auto clock = default_clock;
Curl client{std::make_shared<NullLogger>(), clock};

const auto ignore = [](auto &&...) {};
const HTTPClient::URL url = {"http", "whatever", ""};
const std::string body;
const auto deadline = clock().tick - std::chrono::milliseconds(1);
Expand Down