From 30e565b96975788f963e47ca3de233e1e90850c7 Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Thu, 2 Jul 2026 09:44:49 -0500 Subject: [PATCH 1/5] bench: measure DKG payload deserialization cost Adds a focused bench that constructs representative QCONTRIB and QPCOMMITMENT wire payloads and compares the current double-deserialize intake pattern (structural pre-check on a copy at intake + typed deserialize on the DKG worker) against a single-deserialize pattern (intake deserializes once and hands the typed object to the worker). Local numbers on Apple Silicon (release build, -min-time default): DKG_QCONTRIB_DoubleDeserialize ~8.5 ms/op DKG_QCONTRIB_SingleDeserialize ~4.75 ms/op (~1.8x) DKG_QPCOMMITMENT_DoubleDeserialize ~910 us/op DKG_QPCOMMITMENT_SingleDeserialize ~423 us/op (~2.1x) Costs are dominated by BLS G1/G2 point decompression in the vvec and signatures, so a single decode per payload avoids meaningful work. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/Makefile.bench.include | 1 + src/bench/dkg_deserialize.cpp | 175 ++++++++++++++++++++++++++++++++++ 2 files changed, 176 insertions(+) create mode 100644 src/bench/dkg_deserialize.cpp diff --git a/src/Makefile.bench.include b/src/Makefile.bench.include index be7bfe2d2dcc..05638574634d 100644 --- a/src/Makefile.bench.include +++ b/src/Makefile.bench.include @@ -31,6 +31,7 @@ bench_bench_dash_SOURCES = \ bench/crypto_hash.cpp \ bench/data.cpp \ bench/data.h \ + bench/dkg_deserialize.cpp \ bench/duplicate_inputs.cpp \ bench/ecdsa.cpp \ bench/ellswift.cpp \ diff --git a/src/bench/dkg_deserialize.cpp b/src/bench/dkg_deserialize.cpp new file mode 100644 index 000000000000..f9ccd9ab5c22 --- /dev/null +++ b/src/bench/dkg_deserialize.cpp @@ -0,0 +1,175 @@ +// Copyright (c) 2026 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace { + +// Build a well-formed serialized QCONTRIB payload for the given quorum size / +// threshold. The ciphertext blobs are opaque bytes on the wire (only the +// enclosing IES header is a BLS pubkey), so we can synthesise them without +// running IES encryption. Vvec entries and the outer signature are real BLS +// points so the deserializer performs the same G1/G2 decompression as it would +// in production. +CDataStream BuildSerializedContribution(int quorumSize, int threshold) +{ + llmq::CDKGContribution qc; + qc.llmqType = Consensus::LLMQType::LLMQ_50_60; + qc.quorumHash = GetRandHash(); + qc.proTxHash = GetRandHash(); + + auto vvec = std::make_shared>(); + vvec->reserve(threshold); + for (int i = 0; i < threshold; ++i) { + CBLSSecretKey sk; + sk.MakeNewKey(); + vvec->emplace_back(sk.GetPublicKey()); + } + qc.vvec = std::move(vvec); + + auto contributions = std::make_shared>(); + { + CBLSSecretKey sk; + sk.MakeNewKey(); + contributions->ephemeralPubKey = sk.GetPublicKey(); + } + contributions->ivSeed = GetRandHash(); + contributions->blobs.resize(quorumSize); + // Each IES blob wraps a CBLSSecretKey (32 bytes) plus AEAD framing overhead + // (~16 bytes tag). Sixty-four bytes matches what the real DKG produces on + // the wire closely enough for deserialization cost. Blob contents are + // opaque bytes on the wire (no BLS decoding); a deterministic fill keeps + // the benchmark reproducible. + FastRandomContext rng{true}; + for (auto& blob : contributions->blobs) { + blob = rng.randbytes(64); + } + qc.contributions = std::move(contributions); + + { + CBLSSecretKey sk; + sk.MakeNewKey(); + qc.sig = sk.Sign(GetRandHash(), false); + } + + CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); + ds << qc; + return ds; +} + +CDataStream BuildSerializedPrematureCommitment(int quorumSize) +{ + llmq::CDKGPrematureCommitment qc; + qc.llmqType = Consensus::LLMQType::LLMQ_50_60; + qc.quorumHash = GetRandHash(); + qc.proTxHash = GetRandHash(); + qc.validMembers.assign(quorumSize, true); + { + CBLSSecretKey sk; + sk.MakeNewKey(); + qc.quorumPublicKey = sk.GetPublicKey(); + } + qc.quorumVvecHash = GetRandHash(); + { + CBLSSecretKey sk1, sk2; + sk1.MakeNewKey(); + sk2.MakeNewKey(); + qc.quorumSig = sk1.Sign(GetRandHash(), false); + qc.sig = sk2.Sign(GetRandHash(), false); + } + CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); + ds << qc; + return ds; +} + +} // namespace + +// Simulates the current intake path: deserialize a copy on the network thread +// for structural pre-validation, then deserialize again on the DKG worker to +// obtain the typed object. +static void DKG_QCONTRIB_DoubleDeserialize(benchmark::Bench& bench) +{ + const CDataStream wire = BuildSerializedContribution(50, 30); + + bench.minEpochIterations(200).run([&] { + // Structural check on a copy (mirrors CheckDKGMessageStructure). + { + CDataStream s(wire); + llmq::CDKGContribution qc; + s >> qc; + ankerl::nanobench::doNotOptimizeAway(qc.vvec->size()); + } + // Worker deserialize (mirrors PopAndDeserializeMessages). + { + CDataStream s(wire); + llmq::CDKGContribution qc; + s >> qc; + ankerl::nanobench::doNotOptimizeAway(qc.contributions->blobs.size()); + } + }); +} + +// Simulates the proposed intake path: deserialize once at intake, retain the +// typed object, no worker-side deserialize. +static void DKG_QCONTRIB_SingleDeserialize(benchmark::Bench& bench) +{ + const CDataStream wire = BuildSerializedContribution(50, 30); + + bench.minEpochIterations(200).run([&] { + CDataStream s(wire); + auto qc = std::make_shared(); + s >> *qc; + ankerl::nanobench::doNotOptimizeAway(qc->vvec->size()); + ankerl::nanobench::doNotOptimizeAway(qc->contributions->blobs.size()); + }); +} + +static void DKG_QPCOMMITMENT_DoubleDeserialize(benchmark::Bench& bench) +{ + const CDataStream wire = BuildSerializedPrematureCommitment(50); + + bench.minEpochIterations(2000).run([&] { + { + CDataStream s(wire); + llmq::CDKGPrematureCommitment qc; + s >> qc; + ankerl::nanobench::doNotOptimizeAway(qc.validMembers.size()); + } + { + CDataStream s(wire); + llmq::CDKGPrematureCommitment qc; + s >> qc; + ankerl::nanobench::doNotOptimizeAway(qc.validMembers.size()); + } + }); +} + +static void DKG_QPCOMMITMENT_SingleDeserialize(benchmark::Bench& bench) +{ + const CDataStream wire = BuildSerializedPrematureCommitment(50); + + bench.minEpochIterations(2000).run([&] { + CDataStream s(wire); + auto qc = std::make_shared(); + s >> *qc; + ankerl::nanobench::doNotOptimizeAway(qc->validMembers.size()); + }); +} + +BENCHMARK(DKG_QCONTRIB_DoubleDeserialize, benchmark::PriorityLevel::HIGH) +BENCHMARK(DKG_QCONTRIB_SingleDeserialize, benchmark::PriorityLevel::HIGH) +BENCHMARK(DKG_QPCOMMITMENT_DoubleDeserialize, benchmark::PriorityLevel::HIGH) +BENCHMARK(DKG_QPCOMMITMENT_SingleDeserialize, benchmark::PriorityLevel::HIGH) From c7e3cad4935c2e957f8806b8583716d82ca842da Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Thu, 2 Jul 2026 09:50:17 -0500 Subject: [PATCH 2/5] perf: deserialize DKG network messages once instead of twice CheckDKGMessageStructure previously deserialized a copy of the pushed DKG payload on the network thread to enforce param-derived structural bounds, then the original bytes were retained on the pending queue and deserialized again on the DKG worker thread. For QCONTRIB the BLS point decompression is the dominant cost, so paying it twice per message is meaningful (see bench: roughly 2x wall-clock for QCONTRIB and QPCOMMITMENT before/after this change). Intake now deserializes the payload once, runs the same param-derived bound check on the typed object, and enqueues an already-typed std::shared_ptr. The DKG worker no longer deserializes on pop. CDKGPendingMessages is turned into a class template parameterised on the message type so each queue stores its typed message directly. Preserved: - Oversize rejection with peer misbehavior score 100 before any deserialization or retention. - Malformed-payload rejection with score 100 (a deserialize throw or a bound violation triggers the same ban path as before). - Inventory hash: still computed over the raw wire bytes at intake, matching what peers compute for the same payload; AlreadyHave / HasSeen / PeerEraseObjectRequest semantics unchanged. - EnqueueOwn still serializes with SER_NETWORK/PROTOCOL_VERSION and hashes those bytes, matching the old serialize-then-hash form. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/llmq/dkgsessionhandler.cpp | 49 ---------- src/llmq/dkgsessionhandler.h | 100 ++++++++++++-------- src/llmq/net_dkg.cpp | 165 ++++++++++++++++++--------------- 3 files changed, 152 insertions(+), 162 deletions(-) diff --git a/src/llmq/dkgsessionhandler.cpp b/src/llmq/dkgsessionhandler.cpp index c9258ff353f9..a8bf4c84110e 100644 --- a/src/llmq/dkgsessionhandler.cpp +++ b/src/llmq/dkgsessionhandler.cpp @@ -4,9 +4,6 @@ #include -#include -#include - #include namespace llmq { @@ -25,52 +22,6 @@ CDKGSessionHandler::CDKGSessionHandler(const Consensus::LLMQParams& _params) : CDKGSessionHandler::~CDKGSessionHandler() = default; -void CDKGPendingMessages::PushPendingMessage(NodeId from, std::shared_ptr pm, const uint256& hash) -{ - LOCK(cs_messages); - - if (messagesPerNode[from] >= maxMessagesPerNode) { - // TODO ban? - LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from); - return; - } - messagesPerNode[from]++; - - if (!seenMessages.emplace(hash).second) { - LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); - return; - } - - pendingMessages.emplace_back(std::make_pair(from, std::move(pm))); -} - -std::list CDKGPendingMessages::PopPendingMessages(size_t maxCount) -{ - LOCK(cs_messages); - - std::list ret; - while (!pendingMessages.empty() && ret.size() < maxCount) { - ret.emplace_back(std::move(pendingMessages.front())); - pendingMessages.pop_front(); - } - - return ret; -} - -bool CDKGPendingMessages::HasSeen(const uint256& hash) const -{ - LOCK(cs_messages); - return seenMessages.count(hash) != 0; -} - -void CDKGPendingMessages::Clear() -{ - LOCK(cs_messages); - pendingMessages.clear(); - messagesPerNode.clear(); - seenMessages.clear(); -} - void CDKGSessionHandler::ClearPendingMessages() { pendingContributions.Clear(); diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index be55bfcbaa8a..7ec8a3c6bee0 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -5,19 +5,21 @@ #ifndef BITCOIN_LLMQ_DKGSESSIONHANDLER_H #define BITCOIN_LLMQ_DKGSESSIONHANDLER_H +#include #include // for NodeId #include +#include +#include #include #include #include #include #include +#include #include -class CDataStream; class CBlockIndex; -class uint256; namespace Consensus { struct LLMQParams; @@ -42,66 +44,84 @@ enum class QuorumPhase { }; /** - * Acts as a FIFO queue for incoming DKG messages. The reason we need this is that deserialization of these messages - * is too slow to be processed in the main message handler thread. So, instead of processing them directly from the - * main handler thread, we push them into a CDKGPendingMessages object and later pop+deserialize them in the DKG phase - * handler thread. + * Acts as a FIFO queue for incoming DKG messages. Deserialization of DKG + * payloads (which decompresses BLS G1/G2 points and IES blobs) is too slow + * to run on the main message handler thread, so the intake path performs + * that decode once and hands the typed object to this queue; the DKG phase + * handler thread later pops the already-deserialized message. * - * Each message type has it's own instance of this class. + * Each message type has its own instance of this class. */ +template class CDKGPendingMessages { public: - using BinaryMessage = std::pair>; + using PendingMessage = std::pair>; private: const size_t maxMessagesPerNode; mutable Mutex cs_messages; - std::list pendingMessages GUARDED_BY(cs_messages); + std::list pendingMessages GUARDED_BY(cs_messages); std::map messagesPerNode GUARDED_BY(cs_messages); Uint256HashSet seenMessages GUARDED_BY(cs_messages); public: explicit CDKGPendingMessages(size_t _maxMessagesPerNode) : - maxMessagesPerNode(_maxMessagesPerNode) {}; + maxMessagesPerNode(_maxMessagesPerNode) {} /** - * Enqueue a serialized DKG message under @p from with content hash @p hash. - * Caller is responsible for hashing the payload and (for real peers) - * routing the erase-request to PeerManager. Drops the message silently on + * Enqueue a typed DKG message under @p from with content hash @p hash. + * Caller is responsible for deserializing the payload, computing the + * inventory hash over the raw wire bytes, and (for real peers) routing + * the erase-request to PeerManager. Drops the message silently on * per-node capacity overflow or duplicate hash. */ - void PushPendingMessage(NodeId from, std::shared_ptr pm, const uint256& hash) - EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - - std::list PopPendingMessages(size_t maxCount) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - bool HasSeen(const uint256& hash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - void Clear() EXCLUSIVE_LOCKS_REQUIRED(!cs_messages); - - // Might return nullptr messages, which indicates that deserialization failed for some reason - template - std::vector>> PopAndDeserializeMessages(size_t maxCount) + void PushPendingMessage(NodeId from, std::shared_ptr msg, const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages) { - auto binaryMessages = PopPendingMessages(maxCount); - if (binaryMessages.empty()) { - return {}; + LOCK(cs_messages); + + if (messagesPerNode[from] >= maxMessagesPerNode) { + // TODO ban? + LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from); + return; } + messagesPerNode[from]++; - std::vector>> ret; - ret.reserve(binaryMessages.size()); - for (const auto& bm : binaryMessages) { - auto msg = std::make_shared(); - try { - *bm.second >> *msg; - } catch (...) { - msg = nullptr; - } - ret.emplace_back(std::make_pair(bm.first, std::move(msg))); + if (!seenMessages.emplace(hash).second) { + LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, + hash.ToString(), from); + return; } + pendingMessages.emplace_back(from, std::move(msg)); + } + + std::vector PopPendingMessages(size_t maxCount) EXCLUSIVE_LOCKS_REQUIRED(!cs_messages) + { + LOCK(cs_messages); + std::vector ret; + ret.reserve(std::min(maxCount, pendingMessages.size())); + while (!pendingMessages.empty() && ret.size() < maxCount) { + ret.emplace_back(std::move(pendingMessages.front())); + pendingMessages.pop_front(); + } return ret; } + + bool HasSeen(const uint256& hash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_messages) + { + LOCK(cs_messages); + return seenMessages.count(hash) != 0; + } + + void Clear() EXCLUSIVE_LOCKS_REQUIRED(!cs_messages) + { + LOCK(cs_messages); + pendingMessages.clear(); + messagesPerNode.clear(); + seenMessages.clear(); + } }; /** @@ -113,10 +133,10 @@ class CDKGSessionHandler const Consensus::LLMQParams& params; // Do not guard these, they protect their internals themselves - CDKGPendingMessages pendingContributions; - CDKGPendingMessages pendingComplaints; - CDKGPendingMessages pendingJustifications; - CDKGPendingMessages pendingPrematureCommitments; + CDKGPendingMessages pendingContributions; + CDKGPendingMessages pendingComplaints; + CDKGPendingMessages pendingJustifications; + CDKGPendingMessages pendingPrematureCommitments; public: explicit CDKGSessionHandler(const Consensus::LLMQParams& _params); diff --git a/src/llmq/net_dkg.cpp b/src/llmq/net_dkg.cpp index 627761b454bb..2a656b2abf3f 100644 --- a/src/llmq/net_dkg.cpp +++ b/src/llmq/net_dkg.cpp @@ -71,43 +71,43 @@ size_t MaxDKGMessageSize(std::string_view msg_type, const Consensus::LLMQParams& return cap < HARD_CEILING ? cap : HARD_CEILING; } -// Cheap, param-only structural validation of a pushed DKG message, run at intake -// before retention. Deserializes a COPY of the payload (leaving the caller's bytes -// intact for the pending queue and its inventory hash) and checks only safe upper -// bounds derived from quorum params: no member-list lookup and no signature -// verification, which remain on the DKG worker thread. Deserializing the copy does -// decompress the BLS points carried in the payload, but that work is bounded by -// the size cap applied just before this check. Rejects malformed or clearly -// oversized payloads before retention. -bool CheckDKGMessageStructure(std::string_view msg_type, const CDataStream& vRecv, const Consensus::LLMQParams& params) +// Param-only structural validation of a typed DKG message: checks only safe +// upper bounds derived from quorum params (member-list lookup and signature +// verification remain on the DKG worker thread). Called at intake after the +// single wire deserialize so a subsequent PopPendingMessages hands back an +// already-validated object. +template +bool CheckDKGMessageStructure(const Message& msg, const Consensus::LLMQParams& params); + +template <> +bool CheckDKGMessageStructure(const CDKGContribution& qc, const Consensus::LLMQParams& params) { const size_t size = params.size > 0 ? static_cast(params.size) : 0; const size_t threshold = params.threshold > 0 ? static_cast(params.threshold) : 0; - try { - CDataStream s(vRecv); // copy; deserialization does not advance the caller's stream - if (msg_type == NetMsgType::QCONTRIB) { - CDKGContribution qc; - s >> qc; - return qc.vvec != nullptr && qc.vvec->size() == threshold && - qc.contributions != nullptr && qc.contributions->blobs.size() <= size; - } else if (msg_type == NetMsgType::QCOMPLAINT) { - CDKGComplaint qc; - s >> qc; - return qc.badMembers.size() == qc.complainForMembers.size() && - qc.badMembers.size() <= size; - } else if (msg_type == NetMsgType::QJUSTIFICATION) { - CDKGJustification qj; - s >> qj; - return qj.contributions.size() <= size; - } else if (msg_type == NetMsgType::QPCOMMITMENT) { - CDKGPrematureCommitment qc; - s >> qc; - return qc.validMembers.size() <= size; - } - return false; - } catch (const std::exception&) { - return false; - } + return qc.vvec != nullptr && qc.vvec->size() == threshold && + qc.contributions != nullptr && qc.contributions->blobs.size() <= size; +} + +template <> +bool CheckDKGMessageStructure(const CDKGComplaint& qc, const Consensus::LLMQParams& params) +{ + const size_t size = params.size > 0 ? static_cast(params.size) : 0; + return qc.badMembers.size() == qc.complainForMembers.size() && qc.badMembers.size() <= size; +} + +template <> +bool CheckDKGMessageStructure(const CDKGJustification& qj, const Consensus::LLMQParams& params) +{ + const size_t size = params.size > 0 ? static_cast(params.size) : 0; + return qj.contributions.size() <= size; +} + +template <> +bool CheckDKGMessageStructure(const CDKGPrematureCommitment& qc, + const Consensus::LLMQParams& params) +{ + const size_t size = params.size > 0 ? static_cast(params.size) : 0; + return qc.validMembers.size() <= size; } // returns a set of NodeIds which sent invalid messages @@ -252,21 +252,26 @@ void RelayInvToParticipants(const CDKGSession& session, const CConnman& connman, } template -void EnqueueOwn(CDKGPendingMessages& pending, const Message& msg) +void EnqueueOwn(CDKGPendingMessages& pending, const Message& msg) { + // Own messages skip the wire path but still populate the pending queue so + // the DKG worker sees them alongside peer messages. The inventory hash is + // computed over the serialized form so it matches what remote peers would + // compute for the same message. + auto pending_msg = std::make_shared(msg); CDataStream ds(SER_NETWORK, PROTOCOL_VERSION); - ds << msg; - auto pm = std::make_shared(std::move(ds)); + ds << *pending_msg; CHashWriter hw(SER_GETHASH, 0); - hw.write(AsWritableBytes(Span{*pm})); - pending.PushPendingMessage(/*from=*/-1, std::move(pm), hw.GetHash()); + hw.write(AsWritableBytes(Span{ds})); + pending.PushPendingMessage(/*from=*/-1, std::move(pending_msg), hw.GetHash()); } template -bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, CDKGPendingMessages& pendingMessages, - PeerManagerInternal& peerman, size_t maxCount) +bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, + CDKGPendingMessages& pendingMessages, PeerManagerInternal& peerman, + size_t maxCount) { - auto msgs = pendingMessages.PopAndDeserializeMessages(maxCount); + auto msgs = pendingMessages.PopPendingMessages(maxCount); if (msgs.empty()) { return false; } @@ -276,11 +281,9 @@ bool ProcessPendingMessageBatch(const CConnman& connman, CDKGSession& session, C for (const auto& p : msgs) { const NodeId& nodeId = p.first; - if (!p.second) { - LogPrint(BCLog::LLMQ_DKG, "%s -- failed to deserialize message, peer=%d\n", __func__, nodeId); - peerman.PeerMisbehaving(nodeId, 100); - continue; - } + // Intake deserializes once, so a null typed message never reaches the + // worker: malformed payloads are rejected before enqueue. + Assume(p.second != nullptr); bool ban = false; if (!session.PreVerifyMessage(*p.second, ban)) { if (ban) { @@ -455,49 +458,65 @@ void NetDKG::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStre return; } - // Cheap structural pre-validation before retention. Validates a copy so the - // original bytes (and their inventory hash) are preserved for the worker. - if (!CheckDKGMessageStructure(msg_type, vRecv, llmq_params)) { + // Inventory hash is computed over the raw wire bytes before we consume + // them, matching what a peer would compute for the same payload. + CHashWriter hw(SER_GETHASH, 0); + hw.write(AsWritableBytes(Span{vRecv})); + const uint256 hash = hw.GetHash(); + + // Deserialize once at intake and enforce the same param-derived structural + // bounds that the old copy-based check enforced. On failure (malformed + // wire form or bound violation) the peer is banned exactly as before. + int inv_type = 0; + std::shared_ptr qContribution; + std::shared_ptr qComplaint; + std::shared_ptr qJustification; + std::shared_ptr qPCommitment; + try { + if (msg_type == NetMsgType::QCONTRIB) { + qContribution = std::make_shared(); + vRecv >> *qContribution; + if (!CheckDKGMessageStructure(*qContribution, llmq_params)) throw std::runtime_error("bounds"); + inv_type = MSG_QUORUM_CONTRIB; + } else if (msg_type == NetMsgType::QCOMPLAINT) { + qComplaint = std::make_shared(); + vRecv >> *qComplaint; + if (!CheckDKGMessageStructure(*qComplaint, llmq_params)) throw std::runtime_error("bounds"); + inv_type = MSG_QUORUM_COMPLAINT; + } else if (msg_type == NetMsgType::QJUSTIFICATION) { + qJustification = std::make_shared(); + vRecv >> *qJustification; + if (!CheckDKGMessageStructure(*qJustification, llmq_params)) throw std::runtime_error("bounds"); + inv_type = MSG_QUORUM_JUSTIFICATION; + } else if (msg_type == NetMsgType::QPCOMMITMENT) { + qPCommitment = std::make_shared(); + vRecv >> *qPCommitment; + if (!CheckDKGMessageStructure(*qPCommitment, llmq_params)) throw std::runtime_error("bounds"); + inv_type = MSG_QUORUM_PREMATURE_COMMITMENT; + } + } catch (const std::exception&) { m_peer_manager->PeerMisbehaving(pfrom.GetId(), 100, "malformed DKG message"); return; } - - int inv_type = 0; - if (msg_type == NetMsgType::QCONTRIB) - inv_type = MSG_QUORUM_CONTRIB; - else if (msg_type == NetMsgType::QCOMPLAINT) - inv_type = MSG_QUORUM_COMPLAINT; - else if (msg_type == NetMsgType::QJUSTIFICATION) - inv_type = MSG_QUORUM_JUSTIFICATION; - else if (msg_type == NetMsgType::QPCOMMITMENT) - inv_type = MSG_QUORUM_PREMATURE_COMMITMENT; Assume(inv_type != 0); // guarded by the early-return above - auto pm = std::make_shared(std::move(vRecv)); - CHashWriter hw(SER_GETHASH, 0); - hw.write(AsWritableBytes(Span{*pm})); - const uint256 hash = hw.GetHash(); - const NodeId from = pfrom.GetId(); const bool dispatched = m_qdkgsman.DoForHandler({llmqType, quorumIndex}, [&](CDKGSessionHandler& handler) { - CDKGPendingMessages* pending = nullptr; + WITH_LOCK(::cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{static_cast(inv_type), hash})); switch (inv_type) { case MSG_QUORUM_CONTRIB: - pending = &handler.pendingContributions; + handler.pendingContributions.PushPendingMessage(from, std::move(qContribution), hash); break; case MSG_QUORUM_COMPLAINT: - pending = &handler.pendingComplaints; + handler.pendingComplaints.PushPendingMessage(from, std::move(qComplaint), hash); break; case MSG_QUORUM_JUSTIFICATION: - pending = &handler.pendingJustifications; + handler.pendingJustifications.PushPendingMessage(from, std::move(qJustification), hash); break; case MSG_QUORUM_PREMATURE_COMMITMENT: - pending = &handler.pendingPrematureCommitments; + handler.pendingPrematureCommitments.PushPendingMessage(from, std::move(qPCommitment), hash); break; } - Assume(pending != nullptr); - WITH_LOCK(::cs_main, m_peer_manager->PeerEraseObjectRequest(from, CInv{static_cast(inv_type), hash})); - pending->PushPendingMessage(from, std::move(pm), hash); }); if (!dispatched) { LogPrintf("NetDKG -- no session handlers for quorumIndex [%d]\n", quorumIndex); From e8d9b6d803180c73f8f1f9aef7de9cbdd4d4b1e6 Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Thu, 2 Jul 2026 10:45:49 -0500 Subject: [PATCH 3/5] fix: dedupe DKG pending messages before charging per-node quota Duplicate hashes previously incremented messagesPerNode before the seen-set check, so a peer that resent the same message could exhaust its quota with duplicates and starve legitimate new messages. Check the seen-set first and only charge the quota after we know we will actually enqueue. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/llmq/dkgsessionhandler.h | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 7ec8a3c6bee0..885d7bda233a 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -81,6 +81,14 @@ class CDKGPendingMessages { LOCK(cs_messages); + // Check duplicates before charging the per-node quota so a peer that + // resends the same hash cannot exhaust its budget with dupes. + if (!seenMessages.emplace(hash).second) { + LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, + hash.ToString(), from); + return; + } + if (messagesPerNode[from] >= maxMessagesPerNode) { // TODO ban? LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- too many messages, peer=%d\n", __func__, from); @@ -88,12 +96,6 @@ class CDKGPendingMessages } messagesPerNode[from]++; - if (!seenMessages.emplace(hash).second) { - LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, - hash.ToString(), from); - return; - } - pendingMessages.emplace_back(from, std::move(msg)); } From 029fcc228b30c841b967358fceba34dad41f94e8 Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Thu, 2 Jul 2026 10:45:54 -0500 Subject: [PATCH 4/5] bench: mirror the old worker's make_shared allocation in double-deserialize path The old DKG worker (PopAndDeserializeMessages) allocated the typed object via std::make_shared() before deserializing into it. Mirror that shape in the double-deserialize benchmark's worker block so the before/after comparison isolates the deserialize cost rather than also picking up a stack-vs-heap allocation delta. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/bench/dkg_deserialize.cpp | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/bench/dkg_deserialize.cpp b/src/bench/dkg_deserialize.cpp index f9ccd9ab5c22..a013e881a4ef 100644 --- a/src/bench/dkg_deserialize.cpp +++ b/src/bench/dkg_deserialize.cpp @@ -112,12 +112,13 @@ static void DKG_QCONTRIB_DoubleDeserialize(benchmark::Bench& bench) s >> qc; ankerl::nanobench::doNotOptimizeAway(qc.vvec->size()); } - // Worker deserialize (mirrors PopAndDeserializeMessages). + // Worker deserialize (mirrors PopAndDeserializeMessages, which built + // the typed object via std::make_shared()). { CDataStream s(wire); - llmq::CDKGContribution qc; - s >> qc; - ankerl::nanobench::doNotOptimizeAway(qc.contributions->blobs.size()); + auto qc = std::make_shared(); + s >> *qc; + ankerl::nanobench::doNotOptimizeAway(qc->contributions->blobs.size()); } }); } @@ -142,17 +143,20 @@ static void DKG_QPCOMMITMENT_DoubleDeserialize(benchmark::Bench& bench) const CDataStream wire = BuildSerializedPrematureCommitment(50); bench.minEpochIterations(2000).run([&] { + // Structural check on a copy (mirrors CheckDKGMessageStructure). { CDataStream s(wire); llmq::CDKGPrematureCommitment qc; s >> qc; ankerl::nanobench::doNotOptimizeAway(qc.validMembers.size()); } + // Worker deserialize (mirrors PopAndDeserializeMessages, which built + // the typed object via std::make_shared()). { CDataStream s(wire); - llmq::CDKGPrematureCommitment qc; - s >> qc; - ankerl::nanobench::doNotOptimizeAway(qc.validMembers.size()); + auto qc = std::make_shared(); + s >> *qc; + ankerl::nanobench::doNotOptimizeAway(qc->validMembers.size()); } }); } From 506a5e485b397bec6914ddc8f338cdee87f002fe Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Thu, 2 Jul 2026 19:43:42 -0500 Subject: [PATCH 5/5] fix: avoid poisoning seen DKG hashes on quota drops --- src/llmq/dkgsessionhandler.h | 3 ++- test/util/data/non-backported.txt | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/llmq/dkgsessionhandler.h b/src/llmq/dkgsessionhandler.h index 885d7bda233a..b3beac2917f3 100644 --- a/src/llmq/dkgsessionhandler.h +++ b/src/llmq/dkgsessionhandler.h @@ -83,7 +83,7 @@ class CDKGPendingMessages // Check duplicates before charging the per-node quota so a peer that // resends the same hash cannot exhaust its budget with dupes. - if (!seenMessages.emplace(hash).second) { + if (seenMessages.count(hash) != 0) { LogPrint(BCLog::LLMQ_DKG, "CDKGPendingMessages::%s -- already seen %s, peer=%d\n", __func__, hash.ToString(), from); return; @@ -96,6 +96,7 @@ class CDKGPendingMessages } messagesPerNode[from]++; + seenMessages.emplace(hash); pendingMessages.emplace_back(from, std::move(msg)); } diff --git a/test/util/data/non-backported.txt b/test/util/data/non-backported.txt index 7b49af36a695..14b693d36d27 100644 --- a/test/util/data/non-backported.txt +++ b/test/util/data/non-backported.txt @@ -2,6 +2,7 @@ src/active/*.cpp src/active/*.h src/batchedlogger.* src/bench/bls*.cpp +src/bench/dkg_deserialize.cpp src/bls/*.cpp src/bls/*.h src/cachemap.h