Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1045a85
feat: wall-clock precheck and signal suppression
kaahos May 28, 2026
aed7b1a
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos May 28, 2026
7a250b6
fix
kaahos May 29, 2026
6028fdd
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos May 29, 2026
1e1bcd1
fix: fix build + tests
kaahos May 29, 2026
b1cb73f
fix: fix mem leaks in tests
kaahos May 29, 2026
a3a9462
fix: track wall precheck block state in thread filter
kaahos Jun 1, 2026
137065c
fix: arm wall precheck after recording sample
kaahos Jun 1, 2026
c7caa46
fix: include wait states in wall precheck suppression
kaahos Jun 1, 2026
55073d0
Fix ProfiledThread ownership in park_state_ut
kaahos Jun 1, 2026
619449a
Add Java block-state bridge for wall-clock precheck
kaahos Jun 1, 2026
1cd0f8b
Fix wall-clock thread filter reset
kaahos Jun 2, 2026
3ee7f42
Gate wall-clock precheck on untraced context
kaahos Jun 2, 2026
6bda356
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 3, 2026
8bb1fed
fix: avoid exact suppression for unowned blocked states
kaahos Jun 8, 2026
f82fe70
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 9, 2026
6ece88d
fix: address ownership correctness review
kaahos Jun 11, 2026
110abaa
fix: address thread filter review
kaahos Jun 11, 2026
e9f53b9
fix: factorize code and add support for jvmti
kaahos Jun 12, 2026
e2d60da
fix: fix wall-clock counters and misleading comment
kaahos Jun 12, 2026
5ab946c
Merge branch 'main' into paul.fournillon/wallclock-suppression
kaahos Jun 12, 2026
37df0a5
fix: apply review about test and unused stuff
kaahos Jun 12, 2026
43471ee
fix: apply review about drainSuppressedSampledRun
kaahos Jun 12, 2026
a764667
fix: clean up branch based on PR review recommendations
kaahos Jun 18, 2026
0940fbe
fix: remove TaskBlock snapshot mechanism
kaahos Jun 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,14 @@ Error Arguments::parse(const char *args) {
_jvmtistacks = true;
}

CASE("wallprecheck")
if (value != NULL) {
_wall_precheck = strcmp(value, "false") != 0 && strcmp(value, "0") != 0;
} else {
// No value means enable
_wall_precheck = true;
}

CASE("wallsampler")
if (value != NULL) {
switch (value[0]) {
Expand Down
2 changes: 2 additions & 0 deletions ddprof-lib/src/main/cpp/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ class Arguments {
long _cpu;
long _wall;
bool _wall_collapsing;
bool _wall_precheck;
int _wall_threads_per_tick;
WallclockSampler _wallclock_sampler;
long _memory;
Expand Down Expand Up @@ -207,6 +208,7 @@ class Arguments {
_cpu(-1),
_wall(-1),
_wall_collapsing(false),
_wall_precheck(false),
_wall_threads_per_tick(DEFAULT_WALL_THREADS_PER_TICK),
_wallclock_sampler(ASGCT),
_memory(-1),
Expand Down
4 changes: 4 additions & 0 deletions ddprof-lib/src/main/cpp/counters.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@
X(AGCT_NATIVE_NO_JAVA_CONTEXT, "agct_native_no_java_context") \
X(AGCT_BLOCKED_IN_VM, "agct_blocked_in_vm") \
X(SKIPPED_WALLCLOCK_UNWINDS, "skipped_wallclock_unwinds") \
X(WC_SIGNAL_SUPPRESSED_SAMPLED_RUN, "wc_signals_suppressed_sampled_run") \
X(WC_UNOWNED_BLOCKED_SUPPRESSED, "wc_unowned_blocked_suppressed") \
X(WC_UNOWNED_BLOCKED_RECORDED, "wc_unowned_blocked_recorded") \
X(WC_SIGNAL_QUEUE_FULL, "wc_signals_queue_full") \
X(UNWINDING_TIME_ASYNC, "unwinding_ticks_async") \
X(UNWINDING_TIME_JVMTI, "unwinding_ticks_jvmti") \
X(CALLTRACE_STORAGE_DROPPED, "calltrace_storage_dropped_traces") \
Expand Down
11 changes: 10 additions & 1 deletion ddprof-lib/src/main/cpp/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ class WallClockEpochEvent {
u32 _num_failed_samples;
u32 _num_exited_threads;
u32 _num_permission_denied;
u64 _num_suppressed_sampled_run;

WallClockEpochEvent(u64 start_time)
: _dirty(false), _start_time(start_time), _duration_millis(0),
_num_samplable_threads(0), _num_successful_samples(0),
_num_failed_samples(0), _num_exited_threads(0),
_num_permission_denied(0) {}
_num_permission_denied(0), _num_suppressed_sampled_run(0) {}

bool hasChanged() { return _dirty; }

Expand Down Expand Up @@ -166,13 +167,21 @@ class WallClockEpochEvent {
}
}

void addNumSuppressedSampledRun(u64 n) {
if (n > 0) {
_dirty = true;
_num_suppressed_sampled_run += n;
}
}

void endEpoch(u64 millis) { _duration_millis = millis; }

void clean() { _dirty = false; }

void newEpoch(u64 start_time) {
_dirty = false;
_start_time = start_time;
_num_suppressed_sampled_run = 0;
}
};

Expand Down
13 changes: 10 additions & 3 deletions ddprof-lib/src/main/cpp/flightRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,7 @@ void Recording::recordWallClockEpoch(Buffer *buf, WallClockEpochEvent *event) {
buf->putVar64(event->_num_failed_samples);
buf->putVar64(event->_num_exited_threads);
buf->putVar64(event->_num_permission_denied);
buf->putVar64(event->_num_suppressed_sampled_run);
writeEventSizePrefix(buf, start);
flushIfNeeded(buf);
}
Expand Down Expand Up @@ -2075,7 +2076,7 @@ void FlightRecorder::recordHeapUsage(int lock_index, long value, bool live) {
}
}

void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id,
bool FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id,
int event_type, Event *event) {
OptionalSharedLockGuard locker(&_rec_lock);
if (locker.ownsLock()) {
Expand Down Expand Up @@ -2110,16 +2111,20 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id,
case BCI_NATIVE_SOCKET:
rec->recordNativeSocketSample(buf, tid, call_trace_id, (NativeSocketEvent *)event);
break;
default:
return false;
}
rec->flushIfNeeded(buf);
rec->addThread(lock_index, tid);
return true;
}
} else {
Counters::increment(SAMPLES_DROPPED_REC_LOCK);
}
return false;
}

void FlightRecorder::recordEventDelegated(int lock_index, int tid,
bool FlightRecorder::recordEventDelegated(int lock_index, int tid,
u64 correlation_id, int event_type,
Event *event) {
OptionalSharedLockGuard locker(&_rec_lock);
Expand All @@ -2138,14 +2143,16 @@ void FlightRecorder::recordEventDelegated(int lock_index, int tid,
break;
default:
// Delegation is only wired for CPU/wall samples in v1.
break;
return false;
}
rec->flushIfNeeded(buf);
rec->addThread(lock_index, tid);
return true;
}
} else {
Counters::increment(SAMPLES_DROPPED_REC_LOCK);
}
return false;
}

void FlightRecorder::recordLog(LogLevel level, const char *message,
Expand Down
4 changes: 2 additions & 2 deletions ddprof-lib/src/main/cpp/flightRecorder.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,14 +412,14 @@ class FlightRecorder {

bool active() const { return _rec != NULL; }

void recordEvent(int lock_index, int tid, u64 call_trace_id, int event_type,
bool recordEvent(int lock_index, int tid, u64 call_trace_id, int event_type,
Event *event);

// Emit a BCI_CPU / BCI_WALL sample with no stack-trace attached to our
// recording. `correlation_id` is the same jlong passed to the HotSpot
// RequestStackTrace extension so downstream tooling can join our event with
// the JVM-emitted jdk.StackTraceRequest.
void recordEventDelegated(int lock_index, int tid, u64 correlation_id,
bool recordEventDelegated(int lock_index, int tid, u64 correlation_id,
int event_type, Event *event);

void recordLog(LogLevel level, const char *message, size_t len);
Expand Down
5 changes: 4 additions & 1 deletion ddprof-lib/src/main/cpp/hotspot/vmStructs.inline.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
#include "jvmThread.h"

VMThread* VMThread::current() {
return VMThread::cast(JVMThread::current());
assert(VM::isHotspot());
void* current = JVMThread::current();
return current != nullptr ? VMThread::cast(current) : nullptr;
}

VMThread* VMThread::fromJavaThread(JNIEnv* env, jthread thread) {
assert(VM::isHotspot());
assert(_eetop != nullptr);
if (_eetop != nullptr) {
jlong eetop = env->GetLongField(thread, _eetop);
Expand Down
96 changes: 94 additions & 2 deletions ddprof-lib/src/main/cpp/javaApi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include "counters.h"
#include "common.h"
#include "engine.h"
#include "hotspot/vmStructs.h"
#include "hotspot/vmStructs.inline.h"
#include "incbin.h"
#include "jvmThread.h"
#include "os.h"
Expand Down Expand Up @@ -155,10 +155,13 @@ JavaCritical_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0() {
slot_id = thread_filter->registerThread();
current->setFilterSlotId(slot_id);
}

if (unlikely(slot_id == -1)) {
return; // Failed to register thread
}
// Reset suppression state so a new thread occupying this slot does not inherit
// stale state from its predecessor. Must happen before add().
thread_filter->resetSlotRunState(slot_id);
thread_filter->add(tid, slot_id);
}

Expand Down Expand Up @@ -314,6 +317,95 @@ Java_com_datadoghq_profiler_JavaProfiler_recordQueueEnd0(
Profiler::instance()->recordQueueTime(tid, &event);
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkEnter0(JNIEnv *env, jclass unused) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
bool first_park = current->parkEnter();
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (first_park && tf->enabled()) {
ThreadFilter::SlotID slot_id = current->filterSlotId();
if (slot_id >= 0) {
current->setParkBlockToken(
tf->enterBlockedRun(slot_id, OSThreadState::CONDVAR_WAIT));
}
}
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_parkExit0(
JNIEnv *env, jclass unused, jlong blocker, jlong unblockingSpanId) {
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
u64 park_block_token = 0;
if (!current->parkExit(park_block_token) || park_block_token == 0) {
return;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (tf->enabled()) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Sphinx Review — HIGH] parkExit0 ignores the return value of current->parkExit() and always calls tf->exitBlockedRun(slot_id) unconditionally — even when the thread was not parked (FLAG_PARKED was not set) or when the slot is owned by blockEnter0. This clears an active block run it does not own, disabling suppression for the remaining sleep interval.

Suggestion: bool was_parked = current->parkExit(start_ticks, park_context); if (was_parked) tf->exitBlockedRun(slot_id); Alternatively store the enterBlockedRun token during parkEnter0 and use the generation-checked overload.

Confirmed by adversary.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in 6ece88d.

ThreadFilter::SlotID slot_id = ThreadFilter::tokenSlotId(park_block_token);
if (current->filterSlotId() == slot_id) {
tf->exitBlockedRun(slot_id, ThreadFilter::tokenGeneration(park_block_token));
}
}
}

static bool decodeJavaBlockState(jint state, OSThreadState &decoded) {
if (state == static_cast<jint>(OSThreadState::SLEEPING)) {
decoded = OSThreadState::SLEEPING;
return true;
}
decoded = OSThreadState::UNKNOWN;
return false;
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_JavaProfiler_blockEnter0(
JNIEnv *env, jclass unused, jint state) {
OSThreadState decoded;
if (!decodeJavaBlockState(state, decoded)) {
return 0;
}
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return 0;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (!tf->enabled()) {
return 0;
}
ThreadFilter::SlotID slot_id = current->filterSlotId();
if (slot_id < 0) {
return 0;
}
return static_cast<jlong>(tf->enterBlockedRun(slot_id, decoded));
}

extern "C" DLLEXPORT void JNICALL
Java_com_datadoghq_profiler_JavaProfiler_blockExit0(
JNIEnv *env, jclass unused, jlong token) {
u64 block_token = static_cast<u64>(token);
if (block_token == 0) {
return;
}
ProfiledThread *current = ProfiledThread::current();
if (current == nullptr) {
return;
}
ThreadFilter::SlotID slot_id = ThreadFilter::tokenSlotId(block_token);
if (current->filterSlotId() != slot_id) {
return;
}
ThreadFilter *tf = Profiler::instance()->threadFilter();
if (tf->enabled()) {
tf->exitBlockedRun(slot_id, ThreadFilter::tokenGeneration(block_token));
}
}

extern "C" DLLEXPORT jlong JNICALL
Java_com_datadoghq_profiler_JavaProfiler_currentTicks0(JNIEnv *env,
jclass unused) {
Expand Down
4 changes: 3 additions & 1 deletion ddprof-lib/src/main/cpp/jfrMetadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ void JfrMetadata::initialize(
<< field("numExitedThreads", T_INT,
"Number of Exited Threads Before Handling Signal")
<< field("numPermissionDenied", T_INT,
"Number of Permission Denied Errors"))
"Number of Permission Denied Errors")
<< field("numSuppressedSampledRun", T_LONG,
"Signals suppressed by the wall-clock once-per-run filter"))

<< (type("datadog.ObjectSample", T_ALLOC, "Allocation sample")
<< category("Datadog", "Profiling")
Expand Down
Loading
Loading