Skip to content

Commit

Permalink
Merge pull request #5749 from sigp/electra_op_pool
Browse files Browse the repository at this point in the history
Optimise Electra op pool aggregation
  • Loading branch information
realbigsean authored May 10, 2024
2 parents e448557 + 72548cb commit 7926afe
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 51 deletions.
7 changes: 7 additions & 0 deletions beacon_node/operation_pool/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> {
/// because including two attestations on chain to satisfy different participation bits is
/// impossible without the validator double voting. I.e. it is only suboptimal in the presence
/// of slashable voting, which is rare.
///
/// Post-Electra this optimisation is still OK. The `self.att.data.index` will always be 0 for
/// all Electra attestations, so when a new attestation is added to the solution, we will
/// remove its validators from all attestations at the same slot. It may happen that the
/// included attestation and the attestation being updated have no validators in common, in
/// which case the `retain` will be a no-op. We could consider optimising this in future by only
/// executing the `retain` when the `committee_bits` of the two attestations intersect.
fn update_covering_set(
&mut self,
best_att: &AttestationRef<'a, E>,
Expand Down
196 changes: 162 additions & 34 deletions beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::AttestationStats;
use itertools::Itertools;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use types::{
attestation::{AttestationBase, AttestationElectra},
superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector,
Checkpoint, Epoch, EthSpec, Hash256, Slot,
Checkpoint, Epoch, EthSpec, Hash256, Slot, Unsigned,
};

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
Expand All @@ -30,7 +30,6 @@ pub struct CompactIndexedAttestation<E: EthSpec> {
#[superstruct(only(Electra), partial_getter(rename = "aggregation_bits_electra"))]
pub aggregation_bits: BitList<E::MaxValidatorsPerSlot>,
pub signature: AggregateSignature,
pub index: u64,
#[superstruct(only(Electra))]
pub committee_bits: BitVector<E::MaxCommitteesPerSlot>,
}
Expand All @@ -42,6 +41,7 @@ pub struct SplitAttestation<E: EthSpec> {
pub indexed: CompactIndexedAttestation<E>,
}

// TODO(electra): rename this type
#[derive(Debug, Clone)]
pub struct AttestationRef<'a, E: EthSpec> {
pub checkpoint: &'a CheckpointKey,
Expand Down Expand Up @@ -78,15 +78,13 @@ impl<E: EthSpec> SplitAttestation<E> {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
index: data.index,
})
}
Attestation::Electra(attn) => {
CompactIndexedAttestation::Electra(CompactIndexedAttestationElectra {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
index: data.index,
committee_bits: attn.committee_bits,
})
}
Expand Down Expand Up @@ -159,15 +157,15 @@ impl CheckpointKey {
}

impl<E: EthSpec> CompactIndexedAttestation<E> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
pub fn should_aggregate(&self, other: &Self) -> bool {
match (self, other) {
(CompactIndexedAttestation::Base(this), CompactIndexedAttestation::Base(other)) => {
this.signers_disjoint_from(other)
this.should_aggregate(other)
}
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.signers_disjoint_from(other),
) => this.should_aggregate(other),
// TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with?
_ => false,
}
Expand All @@ -181,15 +179,15 @@ impl<E: EthSpec> CompactIndexedAttestation<E> {
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.aggregate(other),
) => this.aggregate_same_committee(other),
// TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with?
_ => (),
}
}
}

impl<E: EthSpec> CompactIndexedAttestationBase<E> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
pub fn should_aggregate(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
Expand All @@ -208,24 +206,82 @@ impl<E: EthSpec> CompactIndexedAttestationBase<E> {
}

impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
// TODO(electra) update to match spec requirements
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
pub fn should_aggregate(&self, other: &Self) -> bool {
// For Electra, only aggregate attestations in the same committee.
self.committee_bits == other.committee_bits
&& self
.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
}

// TODO(electra) update to match spec requirements
pub fn aggregate(&mut self, other: &Self) {
pub fn aggregate_same_committee(&mut self, other: &Self) {
// TODO(electra): remove assert in favour of Result
assert_eq!(self.committee_bits, other.committee_bits);
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.signature.add_assign_aggregate(&other.signature);
}

pub fn aggregate_with_disjoint_committees(&mut self, other: &Self) {
// TODO(electra): remove asserts or use Result
assert!(self
.committee_bits
.intersection(&other.committee_bits)
.is_zero(),);
// The attestation being aggregated in must only have 1 committee bit set.
assert_eq!(other.committee_bits.num_set_bits(), 1);
// Check we are aggregating in increasing committee index order (so we can append
// aggregation bits).
assert!(self.committee_bits.highest_set_bit() < other.committee_bits.highest_set_bit());

self.committee_bits = self.committee_bits.union(&other.committee_bits);
self.aggregation_bits =
bitlist_extend(&self.aggregation_bits, &other.aggregation_bits).unwrap();
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.signature.add_assign_aggregate(&other.signature);
}

pub fn committee_index(&self) -> u64 {
*self.get_committee_indices().first().unwrap_or(&0u64)
}

pub fn get_committee_indices(&self) -> Vec<u64> {
self.committee_bits
.iter()
.enumerate()
.filter_map(|(index, bit)| if bit { Some(index as u64) } else { None })
.collect()
}
}

// TODO(electra): upstream this or a more efficient implementation
fn bitlist_extend<N: Unsigned>(list1: &BitList<N>, list2: &BitList<N>) -> Option<BitList<N>> {
let new_length = list1.len() + list2.len();
let mut list = BitList::<N>::with_capacity(new_length).ok()?;

// Copy bits from list1.
for (i, bit) in list1.iter().enumerate() {
list.set(i, bit).ok()?;
}

// Copy bits from list2, starting from the end of list1.
let offset = list1.len();
for (i, bit) in list2.iter().enumerate() {
list.set(offset + i, bit).ok()?;
}

Some(list)
}

impl<E: EthSpec> AttestationMap<E> {
Expand All @@ -239,34 +295,106 @@ impl<E: EthSpec> AttestationMap<E> {
let attestation_map = self.checkpoint_map.entry(checkpoint).or_default();
let attestations = attestation_map.attestations.entry(data).or_default();

// TODO(electra):
// Greedily aggregate the attestation with all existing attestations.
// NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
// aggregation.
let mut aggregated = false;

match attestation {
Attestation::Base(_) => {
for existing_attestation in attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
} else if *existing_attestation == indexed {
aggregated = true;
}
}
for existing_attestation in attestations.iter_mut() {
if existing_attestation.should_aggregate(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
} else if *existing_attestation == indexed {
aggregated = true;
}
// TODO(electra) in order to be devnet ready, we can skip
// aggregating here for now. this will result in "poorly"
// constructed blocks, but that should be fine for devnet
Attestation::Electra(_) => (),
};
}

if !aggregated {
attestations.push(indexed);
}
}

/// Aggregate Electra attestations for the same attestation data signed by different
/// committees.
///
/// Non-Electra attestations are left as-is.
pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) {
let Some(attestation_map) = self.checkpoint_map.get_mut(&checkpoint_key) else {
return;
};
for compact_indexed_attestations in attestation_map.attestations.values_mut() {
let unaggregated_attestations = std::mem::take(compact_indexed_attestations);
let mut aggregated_attestations: Vec<CompactIndexedAttestation<E>> = vec![];

// Aggregate the best attestations for each committee and leave the rest.
let mut best_attestations_by_committee: BTreeMap<
u64,
CompactIndexedAttestationElectra<E>,
> = BTreeMap::new();

for committee_attestation in unaggregated_attestations {
let mut electra_attestation = match committee_attestation {
CompactIndexedAttestation::Electra(att)
if att.committee_bits.num_set_bits() == 1 =>
{
att
}
CompactIndexedAttestation::Electra(att) => {
// Aggregate already covers multiple committees, leave it as-is.
aggregated_attestations.push(CompactIndexedAttestation::Electra(att));
continue;
}
CompactIndexedAttestation::Base(att) => {
// Leave as-is.
aggregated_attestations.push(CompactIndexedAttestation::Base(att));
continue;
}
};
let committee_index = electra_attestation.committee_index();
if let Some(existing_attestation) =
best_attestations_by_committee.get_mut(&committee_index)
{
// Search for the best (most aggregation bits) attestation for this committee
// index.
if electra_attestation.aggregation_bits.num_set_bits()
> existing_attestation.aggregation_bits.num_set_bits()
{
// New attestation is better than the previously known one for this
// committee. Replace it.
std::mem::swap(existing_attestation, &mut electra_attestation);
}
// Put the inferior attestation into the list of aggregated attestations
// without performing any cross-committee aggregation.
aggregated_attestations
.push(CompactIndexedAttestation::Electra(electra_attestation));
} else {
// First attestation seen for this committee. Place it in the map
// provisionally.
best_attestations_by_committee.insert(committee_index, electra_attestation);
}
}

if let Some(on_chain_aggregate) =
Self::compute_on_chain_aggregate(best_attestations_by_committee)
{
aggregated_attestations
.push(CompactIndexedAttestation::Electra(on_chain_aggregate));
}

*compact_indexed_attestations = aggregated_attestations;
}
}

pub fn compute_on_chain_aggregate(
mut attestations_by_committee: BTreeMap<u64, CompactIndexedAttestationElectra<E>>,
) -> Option<CompactIndexedAttestationElectra<E>> {
let (_, mut on_chain_aggregate) = attestations_by_committee.pop_first()?;
for (_, attestation) in attestations_by_committee {
on_chain_aggregate.aggregate_with_disjoint_committees(&attestation);
}
Some(on_chain_aggregate)
}

/// Iterate all attestations matching the given `checkpoint_key`.
pub fn get_attestations<'a>(
&'a self,
Expand Down
Loading

0 comments on commit 7926afe

Please sign in to comment.