diff --git a/crates/blockchain/src/aggregation.rs b/crates/blockchain/src/aggregation.rs index 07829164..e100b035 100644 --- a/crates/blockchain/src/aggregation.rs +++ b/crates/blockchain/src/aggregation.rs @@ -290,10 +290,12 @@ pub fn aggregate_job(job: AggregationJob) -> Option { participants.dedup(); let aggregation_bits = aggregation_bits_from_validator_indices(&participants); + let proof = AggregatedSignatureProof::new(aggregation_bits, proof_data); + metrics::observe_aggregated_proof_size(proof.proof_data.len()); Some(AggregatedGroupOutput { hashed: job.hashed, - proof: AggregatedSignatureProof::new(aggregation_bits, proof_data), + proof, participants, keys_to_delete: job.keys_to_delete, }) diff --git a/crates/blockchain/src/metrics.rs b/crates/blockchain/src/metrics.rs index a14dd06a..2307dff4 100644 --- a/crates/blockchain/src/metrics.rs +++ b/crates/blockchain/src/metrics.rs @@ -266,6 +266,25 @@ static LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS: std::sync::L .unwrap() }); +static LEAN_AGGREGATED_PROOF_SIZE_BYTES: std::sync::LazyLock = + std::sync::LazyLock::new(|| { + register_histogram!( + "lean_aggregated_proof_size_bytes", + "Bytes size of an aggregated signature proof's proof_data field", + vec![ + 1024.0, + 4096.0, + 16384.0, + 65536.0, + 131_072.0, + 262_144.0, + 524_288.0, + 1_048_576.0 + ] + ) + .unwrap() + }); + static LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS: std::sync::LazyLock = std::sync::LazyLock::new(|| { register_histogram!( @@ -396,6 +415,7 @@ pub fn init() { std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_BUILDING_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS); std::sync::LazyLock::force(&LEAN_COMMITTEE_SIGNATURES_AGGREGATION_TIME_SECONDS); + std::sync::LazyLock::force(&LEAN_AGGREGATED_PROOF_SIZE_BYTES); std::sync::LazyLock::force(&LEAN_FORK_CHOICE_REORG_DEPTH); // Block production std::sync::LazyLock::force(&LEAN_BLOCK_AGGREGATED_PAYLOADS); @@ -530,6 +550,11 @@ pub fn time_pq_sig_aggregated_signatures_verification() -> TimingGuard { TimingGuard::new(&LEAN_PQ_SIG_AGGREGATED_SIGNATURES_VERIFICATION_TIME_SECONDS) } +/// Observe the size of an aggregated signature proof's `proof_data` bytes. +pub fn observe_aggregated_proof_size(bytes: usize) { + LEAN_AGGREGATED_PROOF_SIZE_BYTES.observe(bytes as f64); +} + /// Observe committee-signature aggregation duration. Measured in the /// off-thread worker and reported back via an `AggregationDone` message, so a /// drop-guard that crosses the thread boundary is not appropriate here. diff --git a/crates/common/metrics/src/lib.rs b/crates/common/metrics/src/lib.rs index 9e508bbd..1000fb3f 100644 --- a/crates/common/metrics/src/lib.rs +++ b/crates/common/metrics/src/lib.rs @@ -5,9 +5,9 @@ pub mod timing; // Re-export prometheus types and macros we use pub use prometheus::{ - Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, - TextEncoder, gather, register_histogram, register_int_counter, register_int_counter_vec, - register_int_gauge, register_int_gauge_vec, + Encoder, Error as PrometheusError, Histogram, HistogramVec, IntCounter, IntCounterVec, + IntGauge, IntGaugeVec, TextEncoder, gather, register_histogram, register_histogram_vec, + register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec, }; // Re-export commonly used items diff --git a/crates/net/p2p/src/gossipsub/handler.rs b/crates/net/p2p/src/gossipsub/handler.rs index 0955ab4a..fd13e58e 100644 --- a/crates/net/p2p/src/gossipsub/handler.rs +++ b/crates/net/p2p/src/gossipsub/handler.rs @@ -29,12 +29,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { let topic_kind = message.topic.as_str().split("/").nth(3); match topic_kind { Some(BLOCK_TOPIC_KIND) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped block")) else { return; }; - metrics::observe_gossip_block_size(uncompressed_data.len()); + metrics::observe_gossip_block_size(uncompressed_data.len(), compressed_len); let Ok(signed_block) = SignedBlock::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped block")) @@ -61,12 +62,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(AGGREGATION_TOPIC_KIND) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped aggregation")) else { return; }; - metrics::observe_gossip_aggregation_size(uncompressed_data.len()); + metrics::observe_gossip_aggregation_size(uncompressed_data.len(), compressed_len); let Ok(aggregation) = SignedAggregatedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped aggregation")) @@ -91,12 +93,13 @@ pub async fn handle_gossipsub_message(server: &mut P2PServer, event: Event) { } } Some(kind) if kind.starts_with(ATTESTATION_SUBNET_TOPIC_PREFIX) => { + let compressed_len = message.data.len(); let Ok(uncompressed_data) = decompress_message(&message.data) .inspect_err(|err| error!(%err, "Failed to decompress gossipped attestation")) else { return; }; - metrics::observe_gossip_attestation_size(uncompressed_data.len()); + metrics::observe_gossip_attestation_size(uncompressed_data.len(), compressed_len); let Ok(signed_attestation) = SignedAttestation::from_ssz_bytes(&uncompressed_data) .inspect_err(|err| error!(?err, "Failed to decode gossipped attestation")) @@ -138,6 +141,8 @@ pub async fn publish_attestation(server: &mut P2PServer, attestation: SignedAtte // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_attestation_size(ssz_bytes.len(), compressed.len()); + // Look up subscribed topic or construct on-the-fly for gossipsub fanout let topic = server .attestation_topics @@ -171,6 +176,8 @@ pub async fn publish_block(server: &mut P2PServer, signed_block: SignedBlock) { // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_block_size(ssz_bytes.len(), compressed.len()); + // Publish to gossipsub server .swarm_handle @@ -197,6 +204,8 @@ pub async fn publish_aggregated_attestation( // Compress with raw snappy let compressed = compress_message(&ssz_bytes); + metrics::observe_gossip_aggregation_size(ssz_bytes.len(), compressed.len()); + // Publish to the aggregation topic server .swarm_handle diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index d6d2c29b..66520d50 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -69,11 +69,16 @@ static LEAN_PEER_DISCONNECTION_EVENTS_TOTAL: LazyLock = LazyLock: }); // --- Gossip Message Size Histograms --- +// +// `compression` label values: +// - `"raw"`: size of SSZ-encoded payload before snappy compression +// - `"snappy"`: size of the on-wire snappy-compressed payload -static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_block_size_bytes", "Bytes size of a gossip block message", + &["compression"], vec![ 10_000.0, 50_000.0, @@ -88,19 +93,21 @@ static LEAN_GOSSIP_BLOCK_SIZE_BYTES: LazyLock = LazyLock::new(|| { .unwrap() }); -static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_ATTESTATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_attestation_size_bytes", "Bytes size of a gossip attestation message", + &["compression"], vec![512.0, 1024.0, 2048.0, 4096.0, 8192.0, 16384.0] ) .unwrap() }); -static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { - register_histogram!( +static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( "lean_gossip_aggregation_size_bytes", "Bytes size of a gossip aggregated attestation message", + &["compression"], vec![ 1024.0, 4096.0, @@ -115,19 +122,94 @@ static LEAN_GOSSIP_AGGREGATION_SIZE_BYTES: LazyLock = LazyLock::new(| .unwrap() }); -/// Observe the size of a gossip block message. -pub fn observe_gossip_block_size(bytes: usize) { - LEAN_GOSSIP_BLOCK_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip block message, recording both the raw SSZ +/// size and the snappy-compressed on-wire size. +pub fn observe_gossip_block_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_BLOCK_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip attestation message. -pub fn observe_gossip_attestation_size(bytes: usize) { - LEAN_GOSSIP_ATTESTATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip attestation message, recording both the raw +/// SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_attestation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_ATTESTATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); } -/// Observe the size of a gossip aggregated attestation message. -pub fn observe_gossip_aggregation_size(bytes: usize) { - LEAN_GOSSIP_AGGREGATION_SIZE_BYTES.observe(bytes as f64); +/// Observe the size of a gossip aggregated attestation message, recording both +/// the raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_gossip_aggregation_size(raw: usize, snappy: usize) { + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["raw"]) + .observe(raw as f64); + LEAN_GOSSIP_AGGREGATION_SIZE_BYTES + .with_label_values(&["snappy"]) + .observe(snappy as f64); +} + +// --- Req/Resp Message Size Histograms --- +// +// `protocol` label: `"status"` or `"blocks_by_root"`. +// `compression` label: `"raw"` (SSZ) or `"snappy"` (on-wire, varint-prefixed +// snappy frame bytes only — the response-code byte is not included). + +static LEAN_REQRESP_REQUEST_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_request_size_bytes", + "Bytes size of a req/resp request", + &["protocol", "compression"], + vec![64.0, 128.0, 256.0, 512.0, 1024.0, 4096.0, 16384.0, 65536.0] + ) + .unwrap() +}); + +static LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES: LazyLock = LazyLock::new(|| { + register_histogram_vec!( + "lean_reqresp_response_chunk_size_bytes", + "Bytes size of a single req/resp response chunk", + &["protocol", "compression"], + vec![ + 128.0, + 1024.0, + 10_000.0, + 100_000.0, + 500_000.0, + 1_000_000.0, + 5_000_000.0, + 10_000_000.0 + ] + ) + .unwrap() +}); + +/// Observe the size of a req/resp request, recording both the raw SSZ size +/// and the snappy-compressed on-wire size. +pub fn observe_reqresp_request_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_REQUEST_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); +} + +/// Observe the size of a single req/resp response chunk, recording both the +/// raw SSZ size and the snappy-compressed on-wire size. +pub fn observe_reqresp_response_chunk_size(protocol: &str, raw: usize, snappy: usize) { + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "raw"]) + .observe(raw as f64); + LEAN_REQRESP_RESPONSE_CHUNK_SIZE_BYTES + .with_label_values(&[protocol, "snappy"]) + .observe(snappy as f64); } /// Set the attestation committee subnet gauge. diff --git a/crates/net/p2p/src/req_resp/codec.rs b/crates/net/p2p/src/req_resp/codec.rs index e85f440a..43764a2e 100644 --- a/crates/net/p2p/src/req_resp/codec.rs +++ b/crates/net/p2p/src/req_resp/codec.rs @@ -12,8 +12,19 @@ use super::{ }, }; +use crate::metrics; use ethlambda_types::block::SignedBlock; +/// Short label extracted from a libp2p protocol id, used as the `protocol` +/// label on req/resp size metrics. +fn protocol_label(protocol: &str) -> &'static str { + match protocol { + STATUS_PROTOCOL_V1 => "status", + BLOCKS_BY_ROOT_PROTOCOL_V1 => "blocks_by_root", + _ => "unknown", + } +} + #[derive(Debug, Clone, Default)] pub struct Codec; @@ -30,7 +41,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, payload.len(), decoded.compressed_size); match protocol.as_ref() { STATUS_PROTOCOL_V1 => { @@ -60,9 +74,10 @@ impl libp2p::request_response::Codec for Codec { where T: AsyncRead + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match protocol.as_ref() { - STATUS_PROTOCOL_V1 => decode_status_response(io).await, - BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io).await, + STATUS_PROTOCOL_V1 => decode_status_response(io, label).await, + BLOCKS_BY_ROOT_PROTOCOL_V1 => decode_blocks_by_root_response(io, label).await, _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("unknown protocol: {}", protocol.as_ref()), @@ -72,7 +87,7 @@ impl libp2p::request_response::Codec for Codec { async fn write_request( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, req: Self::Request, ) -> io::Result<()> @@ -86,18 +101,22 @@ impl libp2p::request_response::Codec for Codec { Request::BlocksByRoot(request) => request.to_ssz(), }; - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + let label = protocol_label(protocol.as_ref()); + metrics::observe_reqresp_request_size(label, encoded.len(), compressed_size); + Ok(()) } async fn write_response( &mut self, - _: &Self::Protocol, + protocol: &Self::Protocol, io: &mut T, resp: Self::Response, ) -> io::Result<()> where T: AsyncWrite + Unpin + Send, { + let label = protocol_label(protocol.as_ref()); match resp { Response::Success { payload } => { match &payload { @@ -105,7 +124,13 @@ impl libp2p::request_response::Codec for Codec { // Send success code (0) io.write_all(&[ResponseCode::SUCCESS.into()]).await?; let encoded = status.to_ssz(); - write_payload(io, &encoded).await + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); + Ok(()) } ResponsePayload::BlocksByRoot(blocks) => { // Write each block as a separate chunk. @@ -123,7 +148,12 @@ impl libp2p::request_response::Codec for Codec { continue; } io.write_all(&[ResponseCode::SUCCESS.into()]).await?; - write_payload(io, &encoded).await?; + let compressed_size = write_payload(io, &encoded).await?; + metrics::observe_reqresp_response_chunk_size( + label, + encoded.len(), + compressed_size, + ); } // Empty response if no blocks found (stream just ends) Ok(()) @@ -137,7 +167,8 @@ impl libp2p::request_response::Codec for Codec { // Error messages are SSZ-encoded as List[byte, 256] let encoded = message.to_ssz(); - write_payload(io, &encoded).await + write_payload(io, &encoded).await?; + Ok(()) } } } @@ -164,7 +195,7 @@ impl libp2p::request_response::Codec for Codec { /// - I/O error occurs while reading the response code or payload /// - Peer's error message cannot be SSZ-decoded (InvalidData) /// - Peer's Status payload cannot be SSZ-decoded (InvalidData) -async fn decode_status_response(io: &mut T) -> io::Result +async fn decode_status_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -173,7 +204,13 @@ where .await?; let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let message = ErrorMessage::from_ssz_bytes(&payload).map_err(|err| { @@ -215,7 +252,7 @@ where /// /// Note: Error chunks from the peer (non-SUCCESS response codes) do not cause this /// function to return `Err` - they are logged and skipped. -async fn decode_blocks_by_root_response(io: &mut T) -> io::Result +async fn decode_blocks_by_root_response(io: &mut T, protocol_label: &str) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -232,7 +269,13 @@ where } let code = ResponseCode::from(result_byte); - let payload = decode_payload(io).await?; + let decoded = decode_payload(io).await?; + let payload = decoded.uncompressed; + metrics::observe_reqresp_response_chunk_size( + protocol_label, + payload.len(), + decoded.compressed_size, + ); if code != ResponseCode::SUCCESS { let error_message = ErrorMessage::from_ssz_bytes(&payload) diff --git a/crates/net/p2p/src/req_resp/encoding.rs b/crates/net/p2p/src/req_resp/encoding.rs index 7a4116c4..1756a0ec 100644 --- a/crates/net/p2p/src/req_resp/encoding.rs +++ b/crates/net/p2p/src/req_resp/encoding.rs @@ -8,8 +8,15 @@ pub const MAX_PAYLOAD_SIZE: usize = 10 * 1024 * 1024; // 10 MB // https://github.com/ethereum/consensus-specs/blob/master/specs/phase0/p2p-interface.md#max_message_size pub const MAX_COMPRESSED_PAYLOAD_SIZE: usize = 32 + MAX_PAYLOAD_SIZE + MAX_PAYLOAD_SIZE / 6 + 1024; // ~12 MB +/// Decoded payload together with the size of its on-wire snappy-compressed +/// bytes (excluding the varint length prefix). +pub struct DecodedPayload { + pub uncompressed: Vec, + pub compressed_size: usize, +} + /// Decode a varint-prefixed, snappy-compressed SSZ payload from an async reader. -pub async fn decode_payload(io: &mut T) -> io::Result> +pub async fn decode_payload(io: &mut T) -> io::Result where T: AsyncRead + Unpin + Send, { @@ -26,6 +33,7 @@ where )); } let (size, rest) = decode_varint(&buf)?; + let compressed_size = rest.len(); if size as usize > MAX_PAYLOAD_SIZE { return Err(io::Error::new( @@ -44,10 +52,15 @@ where )); } - Ok(uncompressed) + Ok(DecodedPayload { + uncompressed, + compressed_size, + }) } -pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result<()> +/// Write a varint-prefixed, snappy-compressed SSZ payload. Returns the size +/// of the snappy-compressed bytes (excluding the varint length prefix). +pub async fn write_payload(io: &mut T, encoded: &[u8]) -> io::Result where T: AsyncWrite + Unpin, { @@ -70,7 +83,7 @@ where io.write_all(varint_buf).await?; io.write_all(&buf).await?; - Ok(()) + Ok(buf.len()) } /// Encodes a u32 as a varint into the provided buffer, returning a slice of the buffer diff --git a/docs/metrics.md b/docs/metrics.md index 3c956fd8..0d331d88 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -29,6 +29,7 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le | `lean_pq_sig_attestations_in_aggregated_signatures_total` | Counter | Total number of attestations included into aggregated signatures | On aggregated signature production | | | ✅ | | `lean_pq_sig_aggregated_signatures_building_time_seconds` | Histogram | Time taken to build an aggregated attestation signature | On aggregated signature production | | 0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4 | ✅ | | `lean_pq_sig_aggregated_signatures_verification_time_seconds` | Histogram | Time taken to verify an aggregated attestation signature | On aggregated signature verification | | 0.1, 0.25, 0.5, 0.75, 1, 1.25, 1.5, 2, 4 | ✅ | +| `lean_aggregated_proof_size_bytes` | Histogram | Bytes size of an aggregated signature proof's `proof_data` field | On aggregated signature production | | 1024, 4096, 16384, 65536, 131072, 262144, 524288, 1048576 | ✅ | ## Fork-Choice Metrics @@ -79,6 +80,11 @@ The exposed metrics follow [the leanMetrics specification](https://github.com/le |`lean_connected_peers`| Gauge | Number of connected peers | On scrape | client=ethlambda,grandine,lantern,lighthouse,qlean,ream,zeam | ✅(*) | |`lean_peer_connection_events_total`| Counter | Total number of peer connection events | On peer connection | direction=inbound,outbound
result=success,timeout,error | ✅ | |`lean_peer_disconnection_events_total`| Counter | Total number of peer disconnection events | On peer disconnection | direction=inbound,outbound
reason=timeout,remote_close,local_close,error | ✅ | +|`lean_gossip_block_size_bytes`| Histogram | Bytes size of a gossip block message (raw SSZ or snappy on-wire) | On gossip block send/receive | compression=raw,snappy | ✅ | +|`lean_gossip_attestation_size_bytes`| Histogram | Bytes size of a gossip attestation message (raw SSZ or snappy on-wire) | On gossip attestation send/receive | compression=raw,snappy | ✅ | +|`lean_gossip_aggregation_size_bytes`| Histogram | Bytes size of a gossip aggregated attestation message (raw SSZ or snappy on-wire) | On gossip aggregation send/receive | compression=raw,snappy | ✅ | +|`lean_reqresp_request_size_bytes`| Histogram | Bytes size of a req/resp request (raw SSZ or snappy on-wire) | On req/resp request send/receive | protocol=status,blocks_by_root
compression=raw,snappy | ✅ | +|`lean_reqresp_response_chunk_size_bytes`| Histogram | Bytes size of a single req/resp response chunk (raw SSZ or snappy on-wire) | On req/resp response chunk send/receive | protocol=status,blocks_by_root
compression=raw,snappy | ✅ | ---