From 30ae35f881f8147ab815041dd312549c3f46f986 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 21 Apr 2026 17:09:31 -0300 Subject: [PATCH 1/3] Add metrics for network message sizes and aggregated signature sizes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extend the three `lean_gossip_{block,attestation,aggregation}_size_bytes` histograms with a `compression` label (raw SSZ vs snappy on-wire) and observe them on both publish and receive paths instead of only receive. Add new `lean_reqresp_request_size_bytes` and `lean_reqresp_response_chunk_size_bytes` histograms for the Status and BlocksByRoot protocols, labeled by `{protocol, compression}`. The req/resp encoding helpers (`write_payload`, `decode_payload`) now return the compressed byte count so the codec can record both sizes without re-encoding. Add `lean_aggregated_proof_size_bytes` histogram observed in `aggregate_job` after each `AggregatedSignatureProof` is built, recording the size of `proof_data` — the variable-length portion of post-quantum aggregated signatures. Re-export `HistogramVec` and `register_histogram_vec!` from `ethlambda-metrics` so labeled histograms can be used workspace-wide. --- crates/blockchain/src/aggregation.rs | 4 +- crates/blockchain/src/metrics.rs | 25 ++++++ crates/common/metrics/src/lib.rs | 6 +- crates/net/p2p/src/gossipsub/handler.rs | 15 +++- crates/net/p2p/src/metrics.rs | 112 ++++++++++++++++++++---- crates/net/p2p/src/req_resp/codec.rs | 69 ++++++++++++--- crates/net/p2p/src/req_resp/encoding.rs | 21 ++++- docs/metrics.md | 6 ++ 8 files changed, 219 insertions(+), 39 deletions(-) diff --git a/crates/blockchain/src/aggregation.rs b/crates/blockchain/src/aggregation.rs index 07829164..e100b035 100644 --- a/crates/blockchain/src/aggregation.rs +++ b/crates/blockchain/src/aggregation.rs @@ -290,10 +290,12 @@ pub fn aggregate_job(job: AggregationJob) -> Option { participants.dedup(); let aggregation_bits = aggregation_bits_from_validator_indices(&participants); + let proof = AggregatedSignatureProof::new(aggregation_bits, proof_data); + metrics::observe_aggregated_proof_size(proof.proof_data.len()); Some(AggregatedGroupOutput { hashed: job.hashed, - proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), + proof, participants, keys_to_delete: job.keys_to_delete, }) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a14dd06a..2307dff4 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -266,6 +266,25 @@ static LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS: std::sync::L .unwrap() }); +static LEAN_AGGREGATED_PROOF_SIZE_BYTES: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_aggregated_proof_size_bytes", + "Bytes size of an aggregated signature proof's proof_data field", + vec![ + 1024.0, + 4096.0, + 16384.0, + 65536.0, + 131_072.0, + 262_144.0, + 524_288.0, + 1_048_576.0 + ] + ) + .unwrap() + }); + static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -396,6 +415,7 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_AGGREGATED_PROOF_SIZE_BYTES); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); // Block production std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); @@ -530,6 +550,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard { TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS) } +/// Observe the size of an aggregated signature proof's `proof_data` bytes. +pub fn observe_aggregated_proof_size(bytes: usize) { + LEAN_AGGREGATED_PROOF_SIZE_BYTES.observe(bytes as f64); +} + /// Observe committee-signature aggregation duration. Measured in the /// off-thread worker and reported back via an `AggregationDone` message, so a /// drop-guard that crosses the thread boundary is not appropriate here. diff --git a/crates/common/metrics/src/lib.rs b/crates/common/metrics/src/lib.rs index 9e508bbd..1000fb3f 100644 --- a/crates/common/metrics/src/lib.rs +++ b/crates/common/metrics/src/lib.rs @@ -5,9 +5,9 @@ pub mod timing; // Re-export prometheus types and macros we use pub use prometheus::{ - Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, - TextEncoder, gather, register_histogram, register_int_counter, register_int_counter_vec, - register_int_gauge, register_int_gauge_vec, + Encoder, Error as PrometheusError, Histogram, HistogramVec, IntCounter, IntCounterVec, + IntGauge, IntGaugeVec, TextEncoder, gather, register_histogram, register_histogram_vec, + register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, }; // Re-export commonly used items diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 0955ab4a..fd13e58e 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -29,12 +29,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let topic_kind = message.topic.as_str().split("/").nth(3); match topic_kind { Some(BLOCK_TOPIC_KIND) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) else { return; }; - metrics::observe_gossip_block_size(uncompressed_data.len()); + metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len); let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) @@ -61,12 +62,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(AGGREGATION_TOPIC_KIND) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) else { return; }; - metrics::observe_gossip_aggregation_size(uncompressed_data.len()); + metrics::observe_gossip_aggregation_size(uncompressed_data.len(), compressed_len); let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) @@ -91,12 +93,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { return; }; - metrics::observe_gossip_attestation_size(uncompressed_data.len()); + metrics::observe_gossip_attestation_size(uncompressed_data.len(), compressed_len); let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) @@ -138,6 +141,8 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_attestation_size(ssz_bytes.len(), compressed.len()); + // Look up subscribed topic or construct on-the-fly for gossipsub fanout let topic = server .attestation_topics @@ -171,6 +176,8 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + // Publish to gossipsub server .swarm_handle @@ -197,6 +204,8 @@ pub async fn publish_aggregated_attestation( // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + // Publish to the aggregation topic server .swarm_handle diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index d6d2c29b..66520d50 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -69,11 +69,16 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock = LazyLock: }); // --- Gossip Message Size Histograms --- +// +// `compression` label values: +// - `"raw"`: size of SSZ-encoded payload before snappy compression +// - `"snappy"`: size of the on-wire snappy-compressed payload -static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_block_size_bytes", "Bytes size of a gossip block message", + &["compression"], vec![ 10_000.0, 50_000.0, @@ -88,19 +93,21 @@ static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { .unwrap() }); -static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_attestation_size_bytes", "Bytes size of a gossip attestation message", + &["compression"], vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] ) .unwrap() }); -static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_aggregation_size_bytes", "Bytes size of a gossip aggregated attestation message", + &["compression"], vec![ 1024.0, 4096.0, @@ -115,19 +122,94 @@ static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(| .unwrap() }); -/// Observe the size of a gossip block message. -pub fn observe_gossip_block_size(bytes: usize) { - LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip block message, recording both the raw SSZ +/// size and the snappy-compressed on-wire size. +pub fn observe_gossip_block_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip attestation message. -pub fn observe_gossip_attestation_size(bytes: usize) { - LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip attestation message, recording both the raw +/// SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_attestation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip aggregated attestation message. -pub fn observe_gossip_aggregation_size(bytes: usize) { - LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip aggregated attestation message, recording both +/// the raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_aggregation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); +} + +// --- Req/Resp Message Size Histograms --- +// +// `protocol` label: `"status"` or `"blocks_by_root"`. +// `compression` label: `"raw"` (SSZ) or `"snappy"` (on-wire, varint-prefixed +// snappy frame bytes only — the response-code byte is not included). + +static LEAN_REQRESP_REQUEST_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_request_size_bytes", + "Bytes size of a req/resp request", + &["protocol", "compression"], + vec![64.0, 128.0, 256.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0] + ) + .unwrap() +}); + +static LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_response_chunk_size_bytes", + "Bytes size of a single req/resp response chunk", + &["protocol", "compression"], + vec![ + 128.0, + 1024.0, + 10_000.0, + 100_000.0, + 500_000.0, + 1_000_000.0, + 5_000_000.0, + 10_000_000.0 + ] + ) + .unwrap() +}); + +/// Observe the size of a req/resp request, recording both the raw SSZ size +/// and the snappy-compressed on-wire size. +pub fn observe_reqresp_request_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); +} + +/// Observe the size of a single req/resp response chunk, recording both the +/// raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_reqresp_response_chunk_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); } /// Set the attestation committee subnet gauge. diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index e85f440a..43764a2e 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -12,8 +12,19 @@ use super::{ }, }; +use crate::metrics; use ethlambda_types::block::SignedBlock; +/// Short label extracted from a libp2p protocol id, used as the `protocol` +/// label on req/resp size metrics. +fn protocol_label(protocol: &str) -> &'static str { + match protocol { + STATUS_PROTOCOL_V1 => "status", + BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root", + _ => "unknown", + } +} + #[derive(Debug, Clone, Default)] pub struct Codec; @@ -30,7 +41,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, payload.len(), decoded.compressed_size); match protocol.as_ref() { STATUS_PROTOCOL_V1 => { @@ -60,9 +74,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match protocol.as_ref() { - STATUS_PROTOCOL_V1 => decode_status_response(io).await, - BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, + STATUS_PROTOCOL_V1 => decode_status_response(io, label).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -72,7 +87,7 @@ impl libp2p::request_response::Codec for Codec { async fn write_request( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -86,18 +101,22 @@ impl libp2p::request_response::Codec for Codec { Request::BlocksByRoot(request) => request.to_ssz(), }; - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, encoded.len(), compressed_size); + Ok(()) } async fn write_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, resp: Self::Response, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match resp { Response::Success { payload } => { match &payload { @@ -105,7 +124,13 @@ impl libp2p::request_response::Codec for Codec { // Send success code (0) io.write_all(&[ResponseCode::SUCCESS.into()]).await?; let encoded = status.to_ssz(); - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); + Ok(()) } ResponsePayload::BlocksByRoot(blocks) => { // Write each block as a separate chunk. @@ -123,7 +148,12 @@ impl libp2p::request_response::Codec for Codec { continue; } io.write_all(&[ResponseCode::SUCCESS.into()]).await?; - write_payload(io, &encoded).await?; + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); } // Empty response if no blocks found (stream just ends) Ok(()) @@ -137,7 +167,8 @@ impl libp2p::request_response::Codec for Codec { // Error messages are SSZ-encoded as List[byte, 256] let encoded = message.to_ssz(); - write_payload(io, &encoded).await + write_payload(io, &encoded).await?; + Ok(()) } } } @@ -164,7 +195,7 @@ impl libp2p::request_response::Codec for Codec { /// - I/O error occurs while reading the response code or payload /// - Peer's error message cannot be SSZ-decoded (InvalidData) /// - Peer's Status payload cannot be SSZ-decoded (InvalidData) -async fn decode_status_response(io: &mut T) -> io::Result +async fn decode_status_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -173,7 +204,13 @@ where .await?; let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { @@ -215,7 +252,7 @@ where /// /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. -async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +async fn decode_blocks_by_root_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -232,7 +269,13 @@ where } let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let error_message = ErrorMessage::from_ssz_bytes(&payload) diff --git a/crates/net/p2p/src/req_resp/encoding.rs b/crates/net/p2p/src/req_resp/encoding.rs index 7a4116c4..1756a0ec 100644 --- a/crates/net/p2p/src/req_resp/encoding.rs +++ b/crates/net/p2p/src/req_resp/encoding.rs @@ -8,8 +8,15 @@ pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB // https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size pub const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB +/// Decoded payload together with the size of its on-wire snappy-compressed +/// bytes (excluding the varint length prefix). +pub struct DecodedPayload { + pub uncompressed: Vec, + pub compressed_size: usize, +} + /// Decode a varint-prefixed, snappy-compressed SSZ payload from an async reader. -pub async fn decode_payload(io: &mut T) -> io::Result> +pub async fn decode_payload(io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -26,6 +33,7 @@ where )); } let (size, rest) = decode_varint(&buf)?; + let compressed_size = rest.len(); if size as usize > MAX_PAYLOAD_SIZE { return Err(io::Error::new( @@ -44,10 +52,15 @@ where )); } - Ok(uncompressed) + Ok(DecodedPayload { + uncompressed, + compressed_size, + }) } -pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result<()> +/// Write a varint-prefixed, snappy-compressed SSZ payload. Returns the size +/// of the snappy-compressed bytes (excluding the varint length prefix). +pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result where T: AsyncWrite + Unpin, { @@ -70,7 +83,7 @@ where io.write_all(varint_buf).await?; io.write_all(&buf).await?; - Ok(()) + Ok(buf.len()) } /// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer diff --git a/docs/metrics.md b/docs/metrics.md index 3c956fd8..0d331d88 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -29,6 +29,7 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le | `lean_pq_sig_attestations_in_aggregated_signatures_total` | Counter | Total number of attestations included into aggregated signatures | On aggregated signature production | | | ✅ | | `lean_pq_sig_aggregated_signatures_building_time_seconds` | Histogram | Time taken to build an aggregated attestation signature | On aggregated signature production | | 0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4 | ✅ | | `lean_pq_sig_aggregated_signatures_verification_time_seconds` | Histogram | Time taken to verify an aggregated attestation signature | On aggregated signature verification | | 0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4 | ✅ | +| `lean_aggregated_proof_size_bytes` | Histogram | Bytes size of an aggregated signature proof's `proof_data` field | On aggregated signature production | | 1024, 4096, 16384, 65536, 131072, 262144, 524288, 1048576 | ✅ | ## Fork-Choice Metrics @@ -79,6 +80,11 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le |`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=ethlambda,grandine,lantern,lighthouse,qlean,ream,zeam | ✅(*) | |`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound
result=success,timeout,error | ✅ | |`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound
reason=timeout,remote_close,local_close,error | ✅ | +|`lean_gossip_block_size_bytes`| Histogram | Bytes size of a gossip block message (raw SSZ or snappy on-wire) | On gossip block send/receive | compression=raw,snappy | ✅ | +|`lean_gossip_attestation_size_bytes`| Histogram | Bytes size of a gossip attestation message (raw SSZ or snappy on-wire) | On gossip attestation send/receive | compression=raw,snappy | ✅ | +|`lean_gossip_aggregation_size_bytes`| Histogram | Bytes size of a gossip aggregated attestation message (raw SSZ or snappy on-wire) | On gossip aggregation send/receive | compression=raw,snappy | ✅ | +|`lean_reqresp_request_size_bytes`| Histogram | Bytes size of a req/resp request (raw SSZ or snappy on-wire) | On req/resp request send/receive | protocol=status,blocks_by_root
compression=raw,snappy | ✅ | +|`lean_reqresp_response_chunk_size_bytes`| Histogram | Bytes size of a single req/resp response chunk (raw SSZ or snappy on-wire) | On req/resp response chunk send/receive | protocol=status,blocks_by_root
compression=raw,snappy | ✅ | --- From 56ce3a2952898ad40d3f2d17aab12ca41b8a8849 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 21 Apr 2026 18:18:45 -0300 Subject: [PATCH 2/3] Accept single-key genesis + annotated-validator layouts. The `lean-quickstart` genesis generator still emits a single XMSS key per validator: `config.yaml`'s `GENESIS_VALIDATORS` is a flat hex-string list, and `annotated_validators.yaml` references one `validator_N_sk.ssz` file per validator with no `attester`/`proposer` split. zeam and lantern already treat that shared key as both the attestation and proposal key for the same validator; ethlambda rejected both files outright, which prevented launching the client against the generator's output. Extend the two loaders to accept the single-key layout in addition to the canonical dual-key one: - `GenesisValidatorEntry` (`crates/common/types/src/genesis.rs`) now deserializes either a map with `attestation_pubkey` + `proposal_pubkey` or a plain hex string; the flat string is used for both roles. - `classify_role` / `read_validator_keys` in `bin/ethlambda/src/main.rs` add a `Shared` role that populates both attester and proposer slots from the same private-key file. Files with the existing `attester` / `proposer` substrings keep their prior routing. A `Shared` entry cannot be mixed with role-specific entries for the same validator. Added a round-trip test for the flat-string genesis variant. Unblocks running ethlambda against `make run-devnet` / lean-quickstart's current generator output. --- bin/ethlambda/src/main.rs | 54 +++++++++++------ crates/common/types/src/genesis.rs | 94 +++++++++++++++++++++++++----- 2 files changed, 115 insertions(+), 33 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816c..c33fa630 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -290,13 +290,18 @@ where enum ValidatorKeyRole { Attestation, Proposal, + /// Single key used for both roles (single-key devnet layout where + /// `generate-genesis.sh` emits one `validator_N_sk.ssz` per validator with + /// no attester/proposer split). + Shared, } -/// Classify a privkey file as attestation or proposal based on the filename. +/// Classify a privkey file as attestation, proposal, or shared based on the +/// filename. /// /// Matches zeam's (`pkgs/cli/src/node.zig:540`) and lantern's -/// (`client_keys.c:606`) routing, which lets all three clients share the -/// `lean-quickstart` generator output unchanged. +/// (`client_keys.c:606`) routing for the dual-key generator output, plus an +/// extra Shared case for the single-key generator output. fn classify_role(file: &Path) -> Result { let name = file .file_name() @@ -307,9 +312,7 @@ fn classify_role(file: &Path) -> Result { match (is_attester, is_proposer) { (true, false) => Ok(ValidatorKeyRole::Attestation), (false, true) => Ok(ValidatorKeyRole::Proposal), - (false, false) => Err(format!( - "filename '{name}' must contain 'attester' or 'proposer'" - )), + (false, false) => Ok(ValidatorKeyRole::Shared), (true, true) => Err(format!( "filename '{name}' contains both 'attester' and 'proposer'; ambiguous" )), @@ -347,23 +350,38 @@ fn read_validator_keys( } }; - // Group entries per validator index, routing each to its role slot. + // Group entries per validator index, routing each to its role slot. A + // `Shared` entry populates both attester and proposer slots; it cannot be + // combined with role-specific entries for the same validator. let mut grouped: BTreeMap = BTreeMap::new(); for entry in validator_vec { let role = classify_role(&entry.privkey_file)?; let path = resolve_path(&entry.privkey_file); - let slots = grouped.entry(entry.index).or_default(); - let target = match role { - ValidatorKeyRole::Attestation => &mut slots.attestation, - ValidatorKeyRole::Proposal => &mut slots.proposal, - }; - if target.is_some() { - return Err(format!( - "validator {}: duplicate {role:?} entry", - entry.index - )); + let idx = entry.index; + let slots = grouped.entry(idx).or_default(); + match role { + ValidatorKeyRole::Attestation => { + if slots.attestation.is_some() { + return Err(format!("validator {idx}: duplicate Attestation entry")); + } + slots.attestation = Some(path); + } + ValidatorKeyRole::Proposal => { + if slots.proposal.is_some() { + return Err(format!("validator {idx}: duplicate Proposal entry")); + } + slots.proposal = Some(path); + } + ValidatorKeyRole::Shared => { + if slots.attestation.is_some() || slots.proposal.is_some() { + return Err(format!( + "validator {idx}: cannot mix shared key with role-specific entries" + )); + } + slots.attestation = Some(path.clone()); + slots.proposal = Some(path); + } } - *target = Some(path); } let load_key = |path: &Path, purpose: &str| -> Result { diff --git a/crates/common/types/src/genesis.rs b/crates/common/types/src/genesis.rs index 27baebf6..4bedf7f1 100644 --- a/crates/common/types/src/genesis.rs +++ b/crates/common/types/src/genesis.rs @@ -3,14 +3,54 @@ use serde::Deserialize; use crate::state::{Validator, ValidatorPubkeyBytes}; /// A single validator entry in the genesis config with dual public keys. +/// +/// Deserializes from either shape: +/// +/// - A map with `attestation_pubkey` and `proposal_pubkey` (the canonical, +/// production-style layout with independent signing keys per role). +/// - A plain hex string (the lean-quickstart / single-key devnet layout), +/// in which case the same pubkey is used for both roles. This matches how +/// zeam and lantern already interpret that field. #[derive(Debug, Clone, Deserialize)] +#[serde(try_from = "GenesisValidatorEntryRaw")] pub struct GenesisValidatorEntry { - #[serde(deserialize_with = "deser_pubkey_hex")] pub attestation_pubkey: ValidatorPubkeyBytes, - #[serde(deserialize_with = "deser_pubkey_hex")] pub proposal_pubkey: ValidatorPubkeyBytes, } +#[derive(Deserialize)] +#[serde(untagged)] +enum GenesisValidatorEntryRaw { + Flat(String), + Dual { + attestation_pubkey: String, + proposal_pubkey: String, + }, +} + +impl TryFrom for GenesisValidatorEntry { + type Error = String; + + fn try_from(raw: GenesisValidatorEntryRaw) -> Result { + match raw { + GenesisValidatorEntryRaw::Flat(pubkey) => { + let bytes = parse_pubkey_hex(&pubkey)?; + Ok(Self { + attestation_pubkey: bytes, + proposal_pubkey: bytes, + }) + } + GenesisValidatorEntryRaw::Dual { + attestation_pubkey, + proposal_pubkey, + } => Ok(Self { + attestation_pubkey: parse_pubkey_hex(&attestation_pubkey)?, + proposal_pubkey: parse_pubkey_hex(&proposal_pubkey)?, + }), + } + } +} + #[derive(Debug, Clone, Deserialize)] pub struct GenesisConfig { #[serde(rename = "GENESIS_TIME")] @@ -33,19 +73,12 @@ impl GenesisConfig { } } -fn deser_pubkey_hex<'de, D>(d: D) -> Result -where - D: serde::Deserializer<'de>, -{ - use serde::de::Error; - - let s = String::deserialize(d)?; - let s = s.strip_prefix("0x").unwrap_or(&s); - let bytes = - hex::decode(s).map_err(|_| D::Error::custom(format!("pubkey is not valid hex: {s}")))?; - bytes.try_into().map_err(|v: Vec| { - D::Error::custom(format!("pubkey has length {} (expected 52)", v.len())) - }) +fn parse_pubkey_hex(s: &str) -> Result { + let s = s.strip_prefix("0x").unwrap_or(s); + let bytes = hex::decode(s).map_err(|_| format!("pubkey is not valid hex: {s}"))?; + bytes + .try_into() + .map_err(|v: Vec| format!("pubkey has length {} (expected 52)", v.len())) } #[cfg(test)] @@ -115,6 +148,37 @@ GENESIS_VALIDATORS: ); } + #[test] + fn deserialize_genesis_config_flat_pubkeys() { + // lean-quickstart / single-key devnet layout: one pubkey per validator, + // used for both attestation and proposal roles. + let yaml = format!( + r#"GENESIS_TIME: 1770407233 +GENESIS_VALIDATORS: + - "{ATT_PUBKEY_A}" + - "0x{ATT_PUBKEY_B}" +"# + ); + + let config: GenesisConfig = + serde_yaml_ng::from_str(&yaml).expect("Failed to deserialize flat-key genesis config"); + + assert_eq!(config.genesis_validators.len(), 2); + assert_eq!( + config.genesis_validators[0].attestation_pubkey, + config.genesis_validators[0].proposal_pubkey, + "flat pubkey must be shared across both roles" + ); + assert_eq!( + config.genesis_validators[0].attestation_pubkey, + hex::decode(ATT_PUBKEY_A).unwrap().as_slice() + ); + assert_eq!( + config.genesis_validators[1].attestation_pubkey, + hex::decode(ATT_PUBKEY_B).unwrap().as_slice() + ); + } + #[test] fn state_from_genesis_uses_defaults() { let validators = vec![Validator { From 5a31ae17a957ea04658cf48a5097bd014c854944 Mon Sep 17 00:00:00 2001 From: Pablo Deymonnaz Date: Tue, 21 Apr 2026 18:56:52 -0300 Subject: [PATCH 3/3] Revert "Accept single-key genesis + annotated-validator layouts." This reverts commit 56ce3a2952898ad40d3f2d17aab12ca41b8a8849. --- bin/ethlambda/src/main.rs | 54 ++++++----------- crates/common/types/src/genesis.rs | 94 +++++------------------------- 2 files changed, 33 insertions(+), 115 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index c33fa630..3c3f816c 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -290,18 +290,13 @@ where enum ValidatorKeyRole { Attestation, Proposal, - /// Single key used for both roles (single-key devnet layout where - /// `generate-genesis.sh` emits one `validator_N_sk.ssz` per validator with - /// no attester/proposer split). - Shared, } -/// Classify a privkey file as attestation, proposal, or shared based on the -/// filename. +/// Classify a privkey file as attestation or proposal based on the filename. /// /// Matches zeam's (`pkgs/cli/src/node.zig:540`) and lantern's -/// (`client_keys.c:606`) routing for the dual-key generator output, plus an -/// extra Shared case for the single-key generator output. +/// (`client_keys.c:606`) routing, which lets all three clients share the +/// `lean-quickstart` generator output unchanged. fn classify_role(file: &Path) -> Result { let name = file .file_name() @@ -312,7 +307,9 @@ fn classify_role(file: &Path) -> Result { match (is_attester, is_proposer) { (true, false) => Ok(ValidatorKeyRole::Attestation), (false, true) => Ok(ValidatorKeyRole::Proposal), - (false, false) => Ok(ValidatorKeyRole::Shared), + (false, false) => Err(format!( + "filename '{name}' must contain 'attester' or 'proposer'" + )), (true, true) => Err(format!( "filename '{name}' contains both 'attester' and 'proposer'; ambiguous" )), @@ -350,38 +347,23 @@ fn read_validator_keys( } }; - // Group entries per validator index, routing each to its role slot. A - // `Shared` entry populates both attester and proposer slots; it cannot be - // combined with role-specific entries for the same validator. + // Group entries per validator index, routing each to its role slot. let mut grouped: BTreeMap = BTreeMap::new(); for entry in validator_vec { let role = classify_role(&entry.privkey_file)?; let path = resolve_path(&entry.privkey_file); - let idx = entry.index; - let slots = grouped.entry(idx).or_default(); - match role { - ValidatorKeyRole::Attestation => { - if slots.attestation.is_some() { - return Err(format!("validator {idx}: duplicate Attestation entry")); - } - slots.attestation = Some(path); - } - ValidatorKeyRole::Proposal => { - if slots.proposal.is_some() { - return Err(format!("validator {idx}: duplicate Proposal entry")); - } - slots.proposal = Some(path); - } - ValidatorKeyRole::Shared => { - if slots.attestation.is_some() || slots.proposal.is_some() { - return Err(format!( - "validator {idx}: cannot mix shared key with role-specific entries" - )); - } - slots.attestation = Some(path.clone()); - slots.proposal = Some(path); - } + let slots = grouped.entry(entry.index).or_default(); + let target = match role { + ValidatorKeyRole::Attestation => &mut slots.attestation, + ValidatorKeyRole::Proposal => &mut slots.proposal, + }; + if target.is_some() { + return Err(format!( + "validator {}: duplicate {role:?} entry", + entry.index + )); } + *target = Some(path); } let load_key = |path: &Path, purpose: &str| -> Result { diff --git a/crates/common/types/src/genesis.rs b/crates/common/types/src/genesis.rs index 4bedf7f1..27baebf6 100644 --- a/crates/common/types/src/genesis.rs +++ b/crates/common/types/src/genesis.rs @@ -3,54 +3,14 @@ use serde::Deserialize; use crate::state::{Validator, ValidatorPubkeyBytes}; /// A single validator entry in the genesis config with dual public keys. -/// -/// Deserializes from either shape: -/// -/// - A map with `attestation_pubkey` and `proposal_pubkey` (the canonical, -/// production-style layout with independent signing keys per role). -/// - A plain hex string (the lean-quickstart / single-key devnet layout), -/// in which case the same pubkey is used for both roles. This matches how -/// zeam and lantern already interpret that field. #[derive(Debug, Clone, Deserialize)] -#[serde(try_from = "GenesisValidatorEntryRaw")] pub struct GenesisValidatorEntry { + #[serde(deserialize_with = "deser_pubkey_hex")] pub attestation_pubkey: ValidatorPubkeyBytes, + #[serde(deserialize_with = "deser_pubkey_hex")] pub proposal_pubkey: ValidatorPubkeyBytes, } -#[derive(Deserialize)] -#[serde(untagged)] -enum GenesisValidatorEntryRaw { - Flat(String), - Dual { - attestation_pubkey: String, - proposal_pubkey: String, - }, -} - -impl TryFrom for GenesisValidatorEntry { - type Error = String; - - fn try_from(raw: GenesisValidatorEntryRaw) -> Result { - match raw { - GenesisValidatorEntryRaw::Flat(pubkey) => { - let bytes = parse_pubkey_hex(&pubkey)?; - Ok(Self { - attestation_pubkey: bytes, - proposal_pubkey: bytes, - }) - } - GenesisValidatorEntryRaw::Dual { - attestation_pubkey, - proposal_pubkey, - } => Ok(Self { - attestation_pubkey: parse_pubkey_hex(&attestation_pubkey)?, - proposal_pubkey: parse_pubkey_hex(&proposal_pubkey)?, - }), - } - } -} - #[derive(Debug, Clone, Deserialize)] pub struct GenesisConfig { #[serde(rename = "GENESIS_TIME")] @@ -73,12 +33,19 @@ impl GenesisConfig { } } -fn parse_pubkey_hex(s: &str) -> Result { - let s = s.strip_prefix("0x").unwrap_or(s); - let bytes = hex::decode(s).map_err(|_| format!("pubkey is not valid hex: {s}"))?; - bytes - .try_into() - .map_err(|v: Vec| format!("pubkey has length {} (expected 52)", v.len())) +fn deser_pubkey_hex<'de, D>(d: D) -> Result +where + D: serde::Deserializer<'de>, +{ + use serde::de::Error; + + let s = String::deserialize(d)?; + let s = s.strip_prefix("0x").unwrap_or(&s); + let bytes = + hex::decode(s).map_err(|_| D::Error::custom(format!("pubkey is not valid hex: {s}")))?; + bytes.try_into().map_err(|v: Vec| { + D::Error::custom(format!("pubkey has length {} (expected 52)", v.len())) + }) } #[cfg(test)] @@ -148,37 +115,6 @@ GENESIS_VALIDATORS: ); } - #[test] - fn deserialize_genesis_config_flat_pubkeys() { - // lean-quickstart / single-key devnet layout: one pubkey per validator, - // used for both attestation and proposal roles. - let yaml = format!( - r#"GENESIS_TIME: 1770407233 -GENESIS_VALIDATORS: - - "{ATT_PUBKEY_A}" - - "0x{ATT_PUBKEY_B}" -"# - ); - - let config: GenesisConfig = - serde_yaml_ng::from_str(&yaml).expect("Failed to deserialize flat-key genesis config"); - - assert_eq!(config.genesis_validators.len(), 2); - assert_eq!( - config.genesis_validators[0].attestation_pubkey, - config.genesis_validators[0].proposal_pubkey, - "flat pubkey must be shared across both roles" - ); - assert_eq!( - config.genesis_validators[0].attestation_pubkey, - hex::decode(ATT_PUBKEY_A).unwrap().as_slice() - ); - assert_eq!( - config.genesis_validators[1].attestation_pubkey, - hex::decode(ATT_PUBKEY_B).unwrap().as_slice() - ); - } - #[test] fn state_from_genesis_uses_defaults() { let validators = vec![Validator {