Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 27 additions & 28 deletions ddprof-lib/src/main/cpp/callTraceHashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -455,37 +455,36 @@ void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight)
if (key_value == 0) {
// Found empty slot - claim it atomically
u64 expected = 0;
if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
// Another thread claimed it, try next slot
if (probe.hasNext()) {
slot = probe.next();
continue;
if (__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) {
// Successfully claimed the slot

// Create a copy of the source trace preserving its exact ID
const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame);
const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame);
void *memory = _allocator.alloc(total_size);
if (memory != nullptr) {
// Use placement new to invoke constructor in-place
CallTrace* copied_trace = new (memory) CallTrace(source_trace->truncated, source_trace->num_frames, source_trace->trace_id);
// memcpy safe since not in signal handler
memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame));
table->values()[slot].setTrace(copied_trace);
Counters::increment(CALLTRACE_STORAGE_BYTES, total_size);
Counters::increment(CALLTRACE_STORAGE_TRACES);

// Increment table size
table->incSize();
} else {
// Allocation failure - clear the key we claimed
__atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE);
}
break;
}

// Create a copy of the source trace preserving its exact ID
const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame);
const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame);
void *memory = _allocator.alloc(total_size);
if (memory != nullptr) {
// Use placement new to invoke constructor in-place
CallTrace* copied_trace = new (memory) CallTrace(source_trace->truncated, source_trace->num_frames, source_trace->trace_id);
// memcpy safe since not in signal handler
memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame));
table->values()[slot].setTrace(copied_trace);
Counters::increment(CALLTRACE_STORAGE_BYTES, total_size);
Counters::increment(CALLTRACE_STORAGE_TRACES);

// Increment table size
u32 new_size = table->incSize();
probe.updateCapacity(new_size);
} else {
// Allocation failure - clear the key we claimed
__atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE);
}
}
if (probe.hasNext()) {
slot = probe.next();
} else {
// No more slot, break out
break;
Comment on lines +485 to 487
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Record overflow when preservation exhausts slots

When the scratch table is full during liveness preservation, this new exit path just breaks out and drops the preserved trace. This can happen because putWithExistingId() never expands the scratch/standby table, while the active table can grow beyond the initial 65k slots; if more live traces are preserved than fit in scratch, the next collect() will emit only the entries that fit and no overflow sentinel because _overflow was not incremented. Please handle this the same way as put()'s full-probe path so callers can at least see the overflow trace instead of silently losing live trace continuity.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jbachorik Any reason it does not expand table here?

}

slot = probe.next();
}
}
3 changes: 1 addition & 2 deletions ddprof-lib/src/main/cpp/callTraceStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <chrono>

// Forward declarations
class CallTraceStorage;
class CallTraceHashTable;

// Liveness checker function type
Expand All @@ -34,7 +33,7 @@ class CallTraceStorage {
// Reserved trace ID for dropped samples due to contention
// Real trace IDs are generated as (instance_id << 32) | slot, where instance_id starts from 1
// Any ID with 0 in upper 32 bits is guaranteed to not clash with real trace IDs
static const u64 DROPPED_TRACE_ID = 1ULL;
static constexpr u64 DROPPED_TRACE_ID = 1ULL;

// Static dropped trace object that appears in JFR constant pool
static CallTrace* getDroppedTrace();
Expand Down
143 changes: 142 additions & 1 deletion ddprof-lib/src/test/cpp/test_callTraceStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/

#include "gtest/gtest.h"
#include "callTraceStorage.h"
#include "../../main/cpp/callTraceStorage.h"
#include <vector>
#include <unordered_set>
#include <thread>
Expand Down Expand Up @@ -664,3 +664,144 @@ TEST_F(CallTraceStorageTest, UseAfterFreeInProcessTraces) {
EXPECT_GE(trace_count, NUM_TRACES) << "Should still find preserved traces";
});
}

/**
* Regression test for the putWithExistingId infinite-loop bug.
*
* Before the fix: when all INITIAL_CAPACITY (65536) slots were occupied the
* probe loop called probe.next() without checking probe.hasNext(), so the
* prime-probe step cycled forever through already-occupied slots.
*
* After the fix: the hasNext() guard breaks the loop and the call returns,
* silently dropping the trace that could not be inserted.
*
* This test fills a fresh CallTraceHashTable beyond INITIAL_CAPACITY via
* putWithExistingId – which unlike put() never triggers table expansion –
* and asserts all calls complete within a hard timeout.
*/
TEST_F(CallTraceStorageTest, PutWithExistingIdNoInfiniteLoopWhenFull) {
static constexpr u32 INITIAL_CAPACITY = 65536;

std::atomic<bool> completed{false};
std::thread worker([&] {
void* mem = std::aligned_alloc(alignof(CallTraceHashTable), sizeof(CallTraceHashTable));
if (mem == nullptr) {
completed = true; // Let the join path handle this; EXPECT below will report.
return;
}

auto tbl = std::unique_ptr<CallTraceHashTable, void(*)(CallTraceHashTable*)>(
new (mem) CallTraceHashTable(),
[](CallTraceHashTable* p) { p->~CallTraceHashTable(); std::free(p); });
tbl->setInstanceId(1);

// Each iteration uses a distinct (bci, method_id) pair so calcHash produces
// a distinct hash, filling INITIAL_CAPACITY unique slots. Iterations past
// that point find no empty slot and must exit the probe via the hasNext()
// guard rather than cycling forever.
for (u32 i = 0; i < INITIAL_CAPACITY + 128; ++i) {
// Stack-allocate source trace; putWithExistingId copies the payload.
alignas(alignof(CallTrace)) char buf[sizeof(CallTrace)];
CallTrace* src = new (buf) CallTrace(false, 1, static_cast<u64>(i) + 1);
src->frames[0].bci = static_cast<int>(i);
src->frames[0].method_id = reinterpret_cast<jmethodID>(static_cast<uintptr_t>(0x10000 + i));
tbl->putWithExistingId(src, 1);
}

completed = true;
});

// 10-second deadline; an un-fixed infinite loop would never set `completed`.
auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
while (!completed.load() && std::chrono::steady_clock::now() < deadline) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

bool ok = completed.load();
if (!ok) {
worker.detach();
} else {
worker.join();
}
EXPECT_TRUE(ok)
<< "putWithExistingId infinite-loop regression: did not terminate within 10 s "
"when the scratch table was full";
}

/**
* Integration test: processTraces preserves live traces across rotation cycles
* without hanging.
*
* Each processTraces() cycle copies preserved traces into the scratch table via
* putWithExistingId. This test verifies:
* 1. Every cycle completes promptly (no infinite loop via putWithExistingId).
* 2. Every trace flagged by the liveness checker survives to the next cycle.
* 3. The trace_id of a preserved trace is unchanged after preservation
* (putWithExistingId must keep the original ID, not generate a new one).
* 4. Frame content is intact after copying (detects use-after-free).
*/
TEST_F(CallTraceStorageTest, LivenessPreservationAcrossMultipleCycles) {
const int N = 200;

// Insert N unique traces and record their IDs and frame values for later checks.
std::vector<u64> ids;
std::vector<int> bcis;
ids.reserve(N);
bcis.reserve(N);
for (int i = 0; i < N; i++) {
ASGCT_CallFrame frame;
frame.bci = i + 1000;
frame.method_id = reinterpret_cast<jmethodID>(static_cast<uintptr_t>(0x30000 + i));
u64 id = storage->put(1, &frame, false, 1);
ASSERT_NE(id, CallTraceStorage::DROPPED_TRACE_ID) << "put() failed for trace " << i;
ids.push_back(id);
bcis.push_back(frame.bci);
}

// Liveness checker marks every stored trace as live.
storage->registerLivenessChecker([&ids](std::unordered_set<u64>& buf) {
for (u64 id : ids) buf.insert(id);
});

const int CYCLES = 5;
for (int cycle = 0; cycle < CYCLES; cycle++) {
std::atomic<bool> done{false};
std::thread t([&] {
storage->processTraces([&](const std::unordered_set<CallTrace*>& traces) {
// All N preserved traces must be present, plus the dropped sentinel.
EXPECT_GE(traces.size(), static_cast<size_t>(N + 1))
<< "cycle " << cycle << ": too few traces";

for (int j = 0; j < N; j++) {
CallTrace* found = findTraceById(traces, ids[j]);
EXPECT_NE(found, nullptr)
<< "cycle " << cycle << ": trace_id " << ids[j] << " not preserved";
if (found == nullptr) continue;

// Trace ID must be unchanged (putWithExistingId preserves the original ID).
EXPECT_EQ(found->trace_id, ids[j])
<< "cycle " << cycle << ": trace_id mutated during preservation";

// Frame content must be intact (detects use-after-free of freed chunks).
EXPECT_EQ(found->frames[0].bci, bcis[j])
<< "cycle " << cycle << ": frame bci corrupted for trace " << ids[j];
}
});
done = true;
});

auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10);
while (!done.load() && std::chrono::steady_clock::now() < deadline) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}

bool ok = done.load();
if (!ok) {
t.detach();
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid detaching a worker that still uses fixture state

If this timeout ever fires (the regression this test is meant to catch, or just a slow sanitizer/CI run), t.detach() lets the lambda continue executing storage->processTraces() and then write done while the test returns and TearDown() destroys storage and the stack locals captured by reference. That turns the intended timeout failure into a use-after-free/crash or a background thread corrupting later tests; use a lifetime-safe watchdog strategy instead of detaching a thread that captured fixture state.

Useful? React with 👍 / 👎.

FAIL() << "processTraces hung on cycle " << cycle
<< " (possible infinite-loop regression in putWithExistingId)";
return;
}
t.join();
}
}
Loading