diff --git a/ddprof-lib/src/main/cpp/livenessTracker.cpp b/ddprof-lib/src/main/cpp/livenessTracker.cpp index 840f6c606..3881879d3 100644 --- a/ddprof-lib/src/main/cpp/livenessTracker.cpp +++ b/ddprof-lib/src/main/cpp/livenessTracker.cpp @@ -19,6 +19,7 @@ #include "os.h" #include "profiler.h" #include "thread.h" +#include "threadLocal.h" #include "tsc.h" #include #include @@ -276,6 +277,29 @@ Error LivenessTracker::initialize(Arguments &args) { return _stored_error = Error::OK; } +static void* create_mt19937() { + // std::mt19937 itself is noexcept, but std::random_device and `new` may throw. + // If that happens we let the failure terminate the process (same outcome as + // failing thread_local initialization previously). + return (void*)(new std::mt19937(std::random_device{}())); +} + +static void* create_uniform_real_distribution() { + // std::uniform_real_distribution<> construction is noexcept, but `new` may throw. + // If allocation fails the process is likely to abort anyway. + return (void*)(new std::uniform_real_distribution<>(0, 1.0)); +} + +static void free_mt19937(void* p) { + std::mt19937* mt = (std::mt19937*)p; + delete mt; +} + +static void free_uniform_real_distribution(void* p) { + std::uniform_real_distribution<>* urd = (std::uniform_real_distribution<>*)p; + delete urd; +} + void LivenessTracker::track(JNIEnv *env, AllocEvent &event, jint tid, jobject object, u64 call_trace_id) { if (!_enabled) { @@ -287,13 +311,17 @@ void LivenessTracker::track(JNIEnv *env, AllocEvent &event, jint tid, return; } - static thread_local std::mt19937 gen(std::random_device{}()); - static thread_local std::uniform_real_distribution<> dis(0, 1.0); - static thread_local double skipped = 0; + static ThreadLocal gen; + static ThreadLocal*, create_uniform_real_distribution, free_uniform_real_distribution> dis; + static ThreadLocal skipped; - if (_subsample_ratio < 1.0 && dis(gen) > _subsample_ratio) { - skipped += static_cast(event._weight) * event._size; - return; + if (_subsample_ratio < 1.0) { + std::mt19937* genp = gen.get(); + std::uniform_real_distribution<>* disp = dis.get(); + if (disp->operator()(*genp) > _subsample_ratio) { + skipped.set(skipped.get() + static_cast(event._weight) * event._size); + return; + } } jweak ref = env->NewWeakGlobalRef(object); @@ -322,7 +350,7 @@ void LivenessTracker::track(JNIEnv *env, AllocEvent &event, jint tid, _table[idx].time = TSC::ticks(); _table[idx].ref = ref; _table[idx].alloc = event; - _table[idx].skipped = skipped; + _table[idx].skipped = skipped.get(); _table[idx].age = 0; _table[idx].call_trace_id = call_trace_id; _table[idx].ctx = ContextApi::snapshot(); @@ -376,7 +404,7 @@ void LivenessTracker::track(JNIEnv *env, AllocEvent &event, jint tid, env->DeleteWeakGlobalRef(ref); } } - skipped = 0; // reset the subsampling skipped bytes + skipped.set(0); // reset the subsampling skipped bytes } void JNICALL LivenessTracker::GarbageCollectionFinish(jvmtiEnv *jvmti_env) { diff --git a/ddprof-lib/src/main/cpp/threadLocal.h b/ddprof-lib/src/main/cpp/threadLocal.h new file mode 100644 index 000000000..b535016cc --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadLocal.h @@ -0,0 +1,142 @@ +/* + * Copyright 2026, Datadog, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef _THREADLOCAL_H +#define _THREADLOCAL_H + +#include +#include +#include +#include "arch.h" + +/** + * This file implements an alternative to C/C++ thread local. + * Due to some restrictions of the language implementations, especially, on musl/aarch64, + * they cannot be safely used in profiler. + * + * How to use? + * A ThreadLocal should be declared as a static variable, e.g. + * + * static void* create_my_object() { + * return new MyObject(); + * } + * + * static void delete_my_object(void* p) { + * MyObject* obj = (MyObject*)p; + * delete obj; + * } + * + * static ThreadLocal var; + * MyObject* value = var.get(); + * ... + * var.clear(); + * ... + * MyObject* new_value = new MyObject(); + * var.set(new_value); + * ... + * var.clear(); + * + */ + +// The function to create value if it does not exist +typedef void* (*CREATE_FUNC)(void); +// Cleanup the value when deleting the key +typedef void (*CLEAN_FUNC)(void*); +template +class ThreadLocal { +protected: + pthread_key_t _key; + +public: + ThreadLocal(const ThreadLocal&) = delete; + ThreadLocal& operator=(const ThreadLocal&) = delete; + + ThreadLocal() { + int err = pthread_key_create(&_key, F); + // What to do if we can not create a key? + assert(err == 0); + } + + ~ThreadLocal() { + pthread_key_delete(_key); + } + + void set(T value) { + int err = pthread_setspecific(_key, (const void*)value); + assert(err == 0); + } + + T get() { + void* p = pthread_getspecific(_key); + if (p == nullptr && C != nullptr) { + p = C(); + set((T)p); + } + return (T)p; + } + + // Clear the value + void clear() { + void* p = nullptr; + if (F != nullptr && (p = pthread_getspecific(_key)) != nullptr) { + int err = pthread_setspecific(_key, nullptr); + // Safety: if reset the value failed, get() can see staled value if + // it is freed. + if (err == 0) { + F(p); + } + } + } +}; + +template <> +class ThreadLocal { +protected: + pthread_key_t _key; +public: + ThreadLocal(const ThreadLocal&) = delete; + ThreadLocal& operator=(const ThreadLocal&) = delete; + + ThreadLocal() { + // Only support 64-bit platforms, double and void* are the same size + static_assert(sizeof(void*) == 8); + static_assert(sizeof(double) == 8); + int err = pthread_key_create(&_key, nullptr); + // What to do if we can not create a key? + assert(err == 0); + } + + ~ThreadLocal() { + pthread_key_delete(_key); + } + + // double <--> u64 cast, preserve bit format + // Can use std::bit_cast after upgrade C++ version to 20 + void set(double value) { + u64 val; + memcpy(&val, &value, sizeof(value)); + int err = pthread_setspecific(_key, (const void*)val); + assert(err == 0); + } + + double get() { + void* p = pthread_getspecific(_key); + if (p == nullptr) { + return 0.0; + } + + u64 val = (u64)p; + double value; + memcpy(&value, &val, sizeof(val)); + return value; + } + + void clear() { + int err = pthread_setspecific(_key, nullptr); + assert(err == 0); + } +}; + +#endif // _THREADLOCAL_H diff --git a/ddprof-lib/src/test/cpp/threadLocal_ut.cpp b/ddprof-lib/src/test/cpp/threadLocal_ut.cpp new file mode 100644 index 000000000..89959bf71 --- /dev/null +++ b/ddprof-lib/src/test/cpp/threadLocal_ut.cpp @@ -0,0 +1,237 @@ +/* + * Copyright 2026 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadLocal.h" +#include "gtest_crash_handler.h" +#include +#include +#include +#include + +static constexpr char THREADLOCAL_TEST_NAME[] = "ThreadLocalTest"; + +// NOTE on the instances below being namespace-scope `static`: +// Keep the ThreadLocal instances alive for the duration of the test binary so +// their pthread keys are not repeatedly created/deleted across tests. +// This mirrors production usage where ThreadLocal instances are typically static +// and live until process exit. + +// ---- generic pointer specialization: plain set/get, no create/clean ---- +static ThreadLocal g_int_tl; + +// ---- lazy-create + cleanup instrumentation ---- +static std::atomic g_created{0}; +static std::atomic g_freed{0}; + +static void *create_tracked() { + g_created.fetch_add(1, std::memory_order_relaxed); + return new int(1234); +} + +static void free_tracked(void *p) { + g_freed.fetch_add(1, std::memory_order_relaxed); + delete static_cast(p); +} + +static ThreadLocal g_tracked_tl; + +// ---- double specialization ---- +static ThreadLocal g_double_tl; + +class ThreadLocalTest : public ::testing::Test { +protected: + void SetUp() override { + installGtestCrashHandler(); + } + void TearDown() override { + restoreDefaultSignalHandlers(); + } +}; + +// set() then get() round-trips a value on the same thread. +TEST_F(ThreadLocalTest, Generic_SetGetRoundTrip) { + g_int_tl.set(42); + EXPECT_EQ(42, g_int_tl.get()); + g_int_tl.set(-7); + EXPECT_EQ(-7, g_int_tl.get()); +} + +// Each thread sees only its own value: storage is per-thread, not shared. +TEST_F(ThreadLocalTest, Generic_PerThreadIsolation) { + constexpr int kThreads = 8; + std::atomic ready{0}; + std::atomic go{false}; + std::vector threads; + std::atomic mismatches{0}; + + for (int i = 0; i < kThreads; ++i) { + threads.emplace_back([&, i] { + // Fresh thread: storage must start empty. + if (g_int_tl.get() != 0) { + mismatches.fetch_add(1, std::memory_order_relaxed); + } + g_int_tl.set(i + 1); + + // Barrier: every thread writes before any thread reads back, so a + // shared (buggy) slot would be observably clobbered. + ready.fetch_add(1, std::memory_order_relaxed); + while (!go.load(std::memory_order_acquire)) { + } + + if (g_int_tl.get() != static_cast(i + 1)) { + mismatches.fetch_add(1, std::memory_order_relaxed); + } + }); + } + + while (ready.load(std::memory_order_relaxed) != kThreads) { + } + go.store(true, std::memory_order_release); + + for (auto &t : threads) { + t.join(); + } + EXPECT_EQ(0, mismatches.load()); +} + +// A fresh thread that never called set() reads the zero-initialized default. +TEST_F(ThreadLocalTest, Generic_UnsetIsZero) { + intptr_t observed = -1; + std::thread t([&] { observed = g_int_tl.get(); }); + t.join(); + EXPECT_EQ(0, observed); +} + +// The create function lazily initializes storage on first get() and is invoked +// exactly once per thread; subsequent get()s return the same pointer. +TEST_F(ThreadLocalTest, Lazy_CreateOncePerThread) { + g_created.store(0, std::memory_order_relaxed); + g_freed.store(0, std::memory_order_relaxed); + + int *first = nullptr; + int *second = nullptr; + int value = 0; + std::thread t([&] { + first = g_tracked_tl.get(); + second = g_tracked_tl.get(); + // Read the payload here: free_tracked() deletes it on thread exit, so + // dereferencing first/second after join() would be use-after-free. + value = *first; + }); + t.join(); + + ASSERT_NE(nullptr, first); + EXPECT_EQ(first, second); // same instance reused (pointer compare only) + EXPECT_EQ(1234, value); // created via create_tracked() + EXPECT_EQ(1, g_created.load()); // created exactly once +} + +// The clean function runs when the owning thread exits, freeing per-thread state. +TEST_F(ThreadLocalTest, Lazy_CleanupOnThreadExit) { + g_created.store(0, std::memory_order_relaxed); + g_freed.store(0, std::memory_order_relaxed); + + std::thread t([&] { + // Touch storage so a value exists to be cleaned up on exit. + ASSERT_NE(nullptr, g_tracked_tl.get()); + }); + t.join(); + // After join the thread has fully terminated, so its TSD destructor + // (free_tracked) must have run. + EXPECT_EQ(1, g_created.load()); + EXPECT_EQ(1, g_freed.load()); +} + +// Independent threads each create and free their own value. +TEST_F(ThreadLocalTest, Lazy_CleanupAcrossManyThreads) { + g_created.store(0, std::memory_order_relaxed); + g_freed.store(0, std::memory_order_relaxed); + + constexpr int kThreads = 16; + std::vector threads; + for (int i = 0; i < kThreads; ++i) { + threads.emplace_back([] { (void)g_tracked_tl.get(); }); + } + for (auto &t : threads) { + t.join(); + } + EXPECT_EQ(kThreads, g_created.load()); + EXPECT_EQ(kThreads, g_freed.load()); +} + +// The double specialization preserves the exact bit pattern through the +// u64<->void* round-trip (on 64-bit targets, where a double fits in a pointer). +TEST_F(ThreadLocalTest, Double_RoundTripPreservesValue) { + static_assert(sizeof(void *) >= sizeof(double), + "ThreadLocal requires pointer >= double width"); + + const double values[] = { + 0.0, + 1.0, + -1.0, + 3.141592653589793, + -2.718281828459045, + 1.7976931348623157e308, // near DBL_MAX + 2.2250738585072014e-308, // near DBL_MIN (smallest normal) + 4.9e-324, // smallest subnormal + }; + + std::atomic mismatches{0}; + std::thread t([&] { + for (double v : values) { + g_double_tl.set(v); + if (g_double_tl.get() != v) { + mismatches.fetch_add(1, std::memory_order_relaxed); + } + } + }); + t.join(); + EXPECT_EQ(0, mismatches.load()); +} + +// An unset double reads back as 0.0 (matches the original `thread_local double = 0`). +TEST_F(ThreadLocalTest, Double_UnsetIsZero) { + double observed = -1.0; + std::thread t([&] { observed = g_double_tl.get(); }); + t.join(); + EXPECT_EQ(0.0, observed); +} + +// Per-thread accumulation mirrors LivenessTracker's `skipped` usage: each thread +// keeps its own running sum, isolated from the others. +TEST_F(ThreadLocalTest, Double_PerThreadAccumulation) { + constexpr int kThreads = 8; + constexpr int kIters = 1000; + std::atomic mismatches{0}; + std::vector threads; + + for (int i = 0; i < kThreads; ++i) { + threads.emplace_back([&, i] { + const double step = static_cast(i + 1); + for (int k = 0; k < kIters; ++k) { + g_double_tl.set(g_double_tl.get() + step); + } + if (g_double_tl.get() != step * kIters) { + mismatches.fetch_add(1, std::memory_order_relaxed); + } + }); + } + for (auto &t : threads) { + t.join(); + } + EXPECT_EQ(0, mismatches.load()); +}