diff --git a/crates/common/types/src/attestation.rs b/crates/common/types/src/attestation.rs index 254ac16..b619fe3 100644 --- a/crates/common/types/src/attestation.rs +++ b/crates/common/types/src/attestation.rs @@ -86,6 +86,22 @@ pub fn validator_indices(bits: &AggregationBits) -> impl Iterator + }) } +/// Returns `true` iff every bit set in `a` is also set in `b` (i.e., `a` is a subset of `b`). +pub fn bits_is_subset(a: &AggregationBits, b: &AggregationBits) -> bool { + let a_bytes = a.as_bytes(); + let b_bytes = b.as_bytes(); + for (i, &a_byte) in a_bytes.iter().enumerate() { + if a_byte == 0 { + continue; + } + let b_byte = b_bytes.get(i).copied().unwrap_or(0); + if a_byte & !b_byte != 0 { + return false; + } + } + true +} + /// Aggregated attestation with its signature proof, used for gossip on the aggregation topic. #[derive(Debug, Clone, SszEncode, SszDecode, HashTreeRoot)] pub struct SignedAggregatedAttestation { diff --git a/crates/storage/src/store.rs b/crates/storage/src/store.rs index 4a3d4fd..7ac7db5 100644 --- a/crates/storage/src/store.rs +++ b/crates/storage/src/store.rs @@ -10,7 +10,7 @@ static EMPTY_BODY_ROOT: LazyLock = LazyLock::new(|| BlockBody::default().h use crate::api::{StorageBackend, StorageWriteBatch, Table}; use ethlambda_types::{ - attestation::{AttestationData, HashedAttestationData}, + attestation::{AttestationData, HashedAttestationData, bits_is_subset}, block::{ AggregatedSignatureProof, Block, BlockBody, BlockHeader, BlockSignatures, SignedBlock, }, @@ -126,18 +126,38 @@ impl PayloadBuffer { } } - /// Insert proofs for an attestation, FIFO-evicting oldest data_roots when total proofs reach capacity. + /// Insert a proof for an attestation, FIFO-evicting oldest data_roots + /// when total proofs reach capacity. Also ensures the buffer doesn't + /// include proofs which are a subset of other proofs for the same + /// attestation data: + /// + /// - If the incoming proof's participants are a subset (incl. equal) of + /// any existing proof, the incoming proof is redundant and skipped. + /// - Otherwise, any existing proof whose participants are a strict subset + /// of the incoming proof's is removed before inserting. fn push(&mut self, hashed: HashedAttestationData, proof: AggregatedSignatureProof) { let (data_root, att_data) = hashed.into_parts(); + if let Some(entry) = self.data.get_mut(&data_root) { - // Skip duplicate proofs (same participants) - if entry - .proofs - .iter() - .any(|p| p.participants == proof.participants) - { - return; + let mut to_remove: Vec = Vec::new(); + for (i, p) in entry.proofs.iter().enumerate() { + // Incoming is subsumed by an existing proof (incl. equal). Skip. + if bits_is_subset(&proof.participants, &p.participants) { + return; + } + // Existing is a strict subset of incoming. Mark for removal. + // (Non-strict equality was ruled out by the check above.) + if bits_is_subset(&p.participants, &proof.participants) { + to_remove.push(i); + } } + + // Remove subsumed proofs (reverse order so earlier indices stay valid). + for i in to_remove.into_iter().rev() { + entry.proofs.swap_remove(i); + self.total_proofs -= 1; + } + entry.proofs.push(proof); self.total_proofs += 1; } else { @@ -1605,6 +1625,17 @@ mod tests { AggregatedSignatureProof::empty(bits) } + /// Create a proof with bits set for every validator in `vids`. + fn make_proof_for_validators(vids: &[u64]) -> AggregatedSignatureProof { + use ethlambda_types::attestation::AggregationBits; + let max = vids.iter().copied().max().unwrap_or(0) as usize; + let mut bits = AggregationBits::with_length(max + 1).unwrap(); + for &v in vids { + bits.set(v as usize, true).unwrap(); + } + AggregatedSignatureProof::empty(bits) + } + fn make_att_data(slot: u64) -> AttestationData { AttestationData { slot, @@ -1728,6 +1759,242 @@ mod tests { assert_eq!(cloned.known_payloads.lock().unwrap().len(), 1); } + #[test] + fn payload_buffer_push_superset_removes_strict_subset() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_subset_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + let kept: HashSet = buf.data[&data_root].proofs[0] + .participant_indices() + .collect(); + assert_eq!(kept, HashSet::from([1, 2, 3])); + } + + #[test] + fn payload_buffer_push_equal_participants_is_skipped() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_incomparable_proofs_coexist() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[3, 4]), + ); + + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&data_root].proofs.len(), 2); + } + + #[test] + fn payload_buffer_push_superset_absorbs_multiple_subsets() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Three pairwise-incomparable singletons: all retained. + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[3]), + ); + assert_eq!(buf.total_proofs, 3); + + // Superset push absorbs all three at once. + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 1); + assert_eq!(buf.data[&data_root].proofs.len(), 1); + // `order` still contains the single entry. + assert_eq!(buf.order.len(), 1); + assert_eq!(buf.order.front().copied(), Some(data_root)); + } + + #[test] + fn payload_buffer_push_mixed_kept_and_removed() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[5, 6]), + ); + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[1, 2, 3]), + ); + + assert_eq!(buf.total_proofs, 2); + + let sets: HashSet> = buf.data[&data_root] + .proofs + .iter() + .map(|p| { + let mut v: Vec = p.participant_indices().collect(); + v.sort_unstable(); + v + }) + .collect(); + assert!(sets.contains(&vec![5, 6])); + assert!(sets.contains(&vec![1, 2, 3])); + } + + #[test] + fn payload_buffer_push_empty_participants_subsumed_by_anything() { + let mut buf = PayloadBuffer::new(10); + let data = make_att_data(1); + let data_root = data.hash_tree_root(); + + // Empty-participant proof inserted first: anything that follows absorbs it. + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[]), + ); + assert_eq!(buf.total_proofs, 1); + buf.push( + HashedAttestationData::new(data.clone()), + make_proof_for_validators(&[1, 2]), + ); + assert_eq!(buf.total_proofs, 1); + assert_eq!( + buf.data[&data_root].proofs[0] + .participant_indices() + .collect::>(), + vec![1, 2] + ); + + // Empty-participant proof pushed against existing non-empty: incoming is subsumed, skipped. + buf.push( + HashedAttestationData::new(data), + make_proof_for_validators(&[]), + ); + assert_eq!(buf.total_proofs, 1); + } + + #[test] + fn payload_buffer_push_cross_data_root_independence() { + let mut buf = PayloadBuffer::new(10); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let root_a = data_a.hash_tree_root(); + let root_b = data_b.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1, 2, 3]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[1, 2]), + ); + + // Different data_roots → no cross-entry subsumption. + assert_eq!(buf.total_proofs, 2); + assert_eq!(buf.data[&root_a].proofs.len(), 1); + assert_eq!(buf.data[&root_b].proofs.len(), 1); + } + + #[test] + fn payload_buffer_push_fifo_eviction_uses_total_proofs() { + let mut buf = PayloadBuffer::new(2); + let data_a = make_att_data(1); + let data_b = make_att_data(2); + let data_c = make_att_data(3); + let root_a = data_a.hash_tree_root(); + let root_c = data_c.hash_tree_root(); + + buf.push( + HashedAttestationData::new(data_a), + make_proof_for_validators(&[1]), + ); + buf.push( + HashedAttestationData::new(data_b), + make_proof_for_validators(&[2, 3]), + ); + // total_proofs == 3, over capacity → evict oldest (root_a). + // Pushing a third distinct data_root triggers eviction via capacity. + buf.push( + HashedAttestationData::new(data_c), + make_proof_for_validators(&[4]), + ); + + assert!(!buf.data.contains_key(&root_a)); + assert!(buf.data.contains_key(&root_c)); + assert_eq!(buf.total_proofs, 2); + } + // ============ GossipSignatureBuffer Tests ============ fn make_dummy_sig() -> ValidatorSignature {