diff --git a/ddprof-lib/src/main/cpp/callTraceHashTable.cpp b/ddprof-lib/src/main/cpp/callTraceHashTable.cpp index 042af9c75..50401ccf3 100644 --- a/ddprof-lib/src/main/cpp/callTraceHashTable.cpp +++ b/ddprof-lib/src/main/cpp/callTraceHashTable.cpp @@ -455,37 +455,36 @@ void CallTraceHashTable::putWithExistingId(CallTrace* source_trace, u64 weight) if (key_value == 0) { // Found empty slot - claim it atomically u64 expected = 0; - if (!__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { - // Another thread claimed it, try next slot - if (probe.hasNext()) { - slot = probe.next(); - continue; + if (__atomic_compare_exchange_n(&keys[slot], &expected, hash, false, __ATOMIC_ACQ_REL, __ATOMIC_RELAXED)) { + // Successfully claimed the slot + + // Create a copy of the source trace preserving its exact ID + const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame); + const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame); + void *memory = _allocator.alloc(total_size); + if (memory != nullptr) { + // Use placement new to invoke constructor in-place + CallTrace* copied_trace = new (memory) CallTrace(source_trace->truncated, source_trace->num_frames, source_trace->trace_id); + // memcpy safe since not in signal handler + memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame)); + table->values()[slot].setTrace(copied_trace); + Counters::increment(CALLTRACE_STORAGE_BYTES, total_size); + Counters::increment(CALLTRACE_STORAGE_TRACES); + + // Increment table size + table->incSize(); + } else { + // Allocation failure - clear the key we claimed + __atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE); } + break; } - - // Create a copy of the source trace preserving its exact ID - const size_t header_size = sizeof(CallTrace) - sizeof(ASGCT_CallFrame); - const size_t total_size = header_size + source_trace->num_frames * sizeof(ASGCT_CallFrame); - void *memory = _allocator.alloc(total_size); - if (memory != nullptr) { - // Use placement new to invoke constructor in-place - CallTrace* copied_trace = new (memory) CallTrace(source_trace->truncated, source_trace->num_frames, source_trace->trace_id); - // memcpy safe since not in signal handler - memcpy(copied_trace->frames, source_trace->frames, source_trace->num_frames * sizeof(ASGCT_CallFrame)); - table->values()[slot].setTrace(copied_trace); - Counters::increment(CALLTRACE_STORAGE_BYTES, total_size); - Counters::increment(CALLTRACE_STORAGE_TRACES); - - // Increment table size - u32 new_size = table->incSize(); - probe.updateCapacity(new_size); - } else { - // Allocation failure - clear the key we claimed - __atomic_store_n(&keys[slot], 0, __ATOMIC_RELEASE); - } + } + if (probe.hasNext()) { + slot = probe.next(); + } else { + // No more slot, break out break; } - - slot = probe.next(); } } diff --git a/ddprof-lib/src/main/cpp/callTraceStorage.h b/ddprof-lib/src/main/cpp/callTraceStorage.h index 4735e18f2..9be8cb038 100644 --- a/ddprof-lib/src/main/cpp/callTraceStorage.h +++ b/ddprof-lib/src/main/cpp/callTraceStorage.h @@ -21,7 +21,6 @@ #include // Forward declarations -class CallTraceStorage; class CallTraceHashTable; // Liveness checker function type @@ -34,7 +33,7 @@ class CallTraceStorage { // Reserved trace ID for dropped samples due to contention // Real trace IDs are generated as (instance_id << 32) | slot, where instance_id starts from 1 // Any ID with 0 in upper 32 bits is guaranteed to not clash with real trace IDs - static const u64 DROPPED_TRACE_ID = 1ULL; + static constexpr u64 DROPPED_TRACE_ID = 1ULL; // Static dropped trace object that appears in JFR constant pool static CallTrace* getDroppedTrace(); diff --git a/ddprof-lib/src/test/cpp/test_callTraceStorage.cpp b/ddprof-lib/src/test/cpp/test_callTraceStorage.cpp index ecaea2f08..fdde65a4c 100644 --- a/ddprof-lib/src/test/cpp/test_callTraceStorage.cpp +++ b/ddprof-lib/src/test/cpp/test_callTraceStorage.cpp @@ -4,7 +4,7 @@ */ #include "gtest/gtest.h" -#include "callTraceStorage.h" +#include "../../main/cpp/callTraceStorage.h" #include #include #include @@ -664,3 +664,144 @@ TEST_F(CallTraceStorageTest, UseAfterFreeInProcessTraces) { EXPECT_GE(trace_count, NUM_TRACES) << "Should still find preserved traces"; }); } + +/** + * Regression test for the putWithExistingId infinite-loop bug. + * + * Before the fix: when all INITIAL_CAPACITY (65536) slots were occupied the + * probe loop called probe.next() without checking probe.hasNext(), so the + * prime-probe step cycled forever through already-occupied slots. + * + * After the fix: the hasNext() guard breaks the loop and the call returns, + * silently dropping the trace that could not be inserted. + * + * This test fills a fresh CallTraceHashTable beyond INITIAL_CAPACITY via + * putWithExistingId – which unlike put() never triggers table expansion – + * and asserts all calls complete within a hard timeout. + */ +TEST_F(CallTraceStorageTest, PutWithExistingIdNoInfiniteLoopWhenFull) { + static constexpr u32 INITIAL_CAPACITY = 65536; + + std::atomic completed{false}; + std::thread worker([&] { + void* mem = std::aligned_alloc(alignof(CallTraceHashTable), sizeof(CallTraceHashTable)); + if (mem == nullptr) { + completed = true; // Let the join path handle this; EXPECT below will report. + return; + } + + auto tbl = std::unique_ptr( + new (mem) CallTraceHashTable(), + [](CallTraceHashTable* p) { p->~CallTraceHashTable(); std::free(p); }); + tbl->setInstanceId(1); + + // Each iteration uses a distinct (bci, method_id) pair so calcHash produces + // a distinct hash, filling INITIAL_CAPACITY unique slots. Iterations past + // that point find no empty slot and must exit the probe via the hasNext() + // guard rather than cycling forever. + for (u32 i = 0; i < INITIAL_CAPACITY + 128; ++i) { + // Stack-allocate source trace; putWithExistingId copies the payload. + alignas(alignof(CallTrace)) char buf[sizeof(CallTrace)]; + CallTrace* src = new (buf) CallTrace(false, 1, static_cast(i) + 1); + src->frames[0].bci = static_cast(i); + src->frames[0].method_id = reinterpret_cast(static_cast(0x10000 + i)); + tbl->putWithExistingId(src, 1); + } + + completed = true; + }); + + // 10-second deadline; an un-fixed infinite loop would never set `completed`. + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (!completed.load() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + bool ok = completed.load(); + if (!ok) { + worker.detach(); + } else { + worker.join(); + } + EXPECT_TRUE(ok) + << "putWithExistingId infinite-loop regression: did not terminate within 10 s " + "when the scratch table was full"; +} + +/** + * Integration test: processTraces preserves live traces across rotation cycles + * without hanging. + * + * Each processTraces() cycle copies preserved traces into the scratch table via + * putWithExistingId. This test verifies: + * 1. Every cycle completes promptly (no infinite loop via putWithExistingId). + * 2. Every trace flagged by the liveness checker survives to the next cycle. + * 3. The trace_id of a preserved trace is unchanged after preservation + * (putWithExistingId must keep the original ID, not generate a new one). + * 4. Frame content is intact after copying (detects use-after-free). + */ +TEST_F(CallTraceStorageTest, LivenessPreservationAcrossMultipleCycles) { + const int N = 200; + + // Insert N unique traces and record their IDs and frame values for later checks. + std::vector ids; + std::vector bcis; + ids.reserve(N); + bcis.reserve(N); + for (int i = 0; i < N; i++) { + ASGCT_CallFrame frame; + frame.bci = i + 1000; + frame.method_id = reinterpret_cast(static_cast(0x30000 + i)); + u64 id = storage->put(1, &frame, false, 1); + ASSERT_NE(id, CallTraceStorage::DROPPED_TRACE_ID) << "put() failed for trace " << i; + ids.push_back(id); + bcis.push_back(frame.bci); + } + + // Liveness checker marks every stored trace as live. + storage->registerLivenessChecker([&ids](std::unordered_set& buf) { + for (u64 id : ids) buf.insert(id); + }); + + const int CYCLES = 5; + for (int cycle = 0; cycle < CYCLES; cycle++) { + std::atomic done{false}; + std::thread t([&] { + storage->processTraces([&](const std::unordered_set& traces) { + // All N preserved traces must be present, plus the dropped sentinel. + EXPECT_GE(traces.size(), static_cast(N + 1)) + << "cycle " << cycle << ": too few traces"; + + for (int j = 0; j < N; j++) { + CallTrace* found = findTraceById(traces, ids[j]); + EXPECT_NE(found, nullptr) + << "cycle " << cycle << ": trace_id " << ids[j] << " not preserved"; + if (found == nullptr) continue; + + // Trace ID must be unchanged (putWithExistingId preserves the original ID). + EXPECT_EQ(found->trace_id, ids[j]) + << "cycle " << cycle << ": trace_id mutated during preservation"; + + // Frame content must be intact (detects use-after-free of freed chunks). + EXPECT_EQ(found->frames[0].bci, bcis[j]) + << "cycle " << cycle << ": frame bci corrupted for trace " << ids[j]; + } + }); + done = true; + }); + + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (!done.load() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + bool ok = done.load(); + if (!ok) { + t.detach(); + FAIL() << "processTraces hung on cycle " << cycle + << " (possible infinite-loop regression in putWithExistingId)"; + return; + } + t.join(); + } +}