Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise Electra op pool aggregation #5749

Merged
merged 8 commits into from
May 10, 2024
Merged
7 changes: 7 additions & 0 deletions beacon_node/operation_pool/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ impl<'a, E: EthSpec> MaxCover for AttMaxCover<'a, E> {
/// because including two attestations on chain to satisfy different participation bits is
/// impossible without the validator double voting. I.e. it is only suboptimal in the presence
/// of slashable voting, which is rare.
///
/// Post-Electra this optimisation is still OK. The `self.att.data.index` will always be 0 for
/// all Electra attestations, so when a new attestation is added to the solution, we will
/// remove its validators from all attestations at the same slot. It may happen that the
/// included attestation and the attestation being updated have no validators in common, in
/// which case the `retain` will be a no-op. We could consider optimising this in future by only
/// executing the `retain` when the `committee_bits` of the two attestations intersect.
fn update_covering_set(
&mut self,
best_att: &AttestationRef<'a, E>,
Expand Down
196 changes: 162 additions & 34 deletions beacon_node/operation_pool/src/attestation_storage.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::AttestationStats;
use itertools::Itertools;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use types::{
attestation::{AttestationBase, AttestationElectra},
superstruct, AggregateSignature, Attestation, AttestationData, BeaconState, BitList, BitVector,
Checkpoint, Epoch, EthSpec, Hash256, Slot,
Checkpoint, Epoch, EthSpec, Hash256, Slot, Unsigned,
};

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
Expand All @@ -30,7 +30,6 @@ pub struct CompactIndexedAttestation<E: EthSpec> {
#[superstruct(only(Electra), partial_getter(rename = "aggregation_bits_electra"))]
pub aggregation_bits: BitList<E::MaxValidatorsPerSlot>,
pub signature: AggregateSignature,
pub index: u64,
#[superstruct(only(Electra))]
pub committee_bits: BitVector<E::MaxCommitteesPerSlot>,
}
Expand All @@ -42,6 +41,7 @@ pub struct SplitAttestation<E: EthSpec> {
pub indexed: CompactIndexedAttestation<E>,
}

// TODO(electra): rename this type
#[derive(Debug, Clone)]
pub struct AttestationRef<'a, E: EthSpec> {
pub checkpoint: &'a CheckpointKey,
Expand Down Expand Up @@ -78,15 +78,13 @@ impl<E: EthSpec> SplitAttestation<E> {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
index: data.index,
})
}
Attestation::Electra(attn) => {
CompactIndexedAttestation::Electra(CompactIndexedAttestationElectra {
attesting_indices,
aggregation_bits: attn.aggregation_bits,
signature: attestation.signature().clone(),
index: data.index,
committee_bits: attn.committee_bits,
})
}
Expand Down Expand Up @@ -159,15 +157,15 @@ impl CheckpointKey {
}

impl<E: EthSpec> CompactIndexedAttestation<E> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
pub fn should_aggregate(&self, other: &Self) -> bool {
match (self, other) {
(CompactIndexedAttestation::Base(this), CompactIndexedAttestation::Base(other)) => {
this.signers_disjoint_from(other)
this.should_aggregate(other)
}
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.signers_disjoint_from(other),
) => this.should_aggregate(other),
// TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with?
_ => false,
}
Expand All @@ -181,15 +179,15 @@ impl<E: EthSpec> CompactIndexedAttestation<E> {
(
CompactIndexedAttestation::Electra(this),
CompactIndexedAttestation::Electra(other),
) => this.aggregate(other),
) => this.aggregate_same_committee(other),
// TODO(electra) is a mix of electra and base compact indexed attestations an edge case we need to deal with?
_ => (),
}
}
}

impl<E: EthSpec> CompactIndexedAttestationBase<E> {
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
pub fn should_aggregate(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
Expand All @@ -208,24 +206,82 @@ impl<E: EthSpec> CompactIndexedAttestationBase<E> {
}

impl<E: EthSpec> CompactIndexedAttestationElectra<E> {
// TODO(electra) update to match spec requirements
pub fn signers_disjoint_from(&self, other: &Self) -> bool {
self.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
pub fn should_aggregate(&self, other: &Self) -> bool {
// For Electra, only aggregate attestations in the same committee.
self.committee_bits == other.committee_bits
&& self
.aggregation_bits
.intersection(&other.aggregation_bits)
.is_zero()
}

// TODO(electra) update to match spec requirements
pub fn aggregate(&mut self, other: &Self) {
pub fn aggregate_same_committee(&mut self, other: &Self) {
// TODO(electra): remove assert in favour of Result
assert_eq!(self.committee_bits, other.committee_bits);
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.signature.add_assign_aggregate(&other.signature);
}

pub fn aggregate_with_disjoint_committees(&mut self, other: &Self) {
// TODO(electra): remove asserts or use Result
assert!(self
.committee_bits
.intersection(&other.committee_bits)
.is_zero(),);
// The attestation being aggregated in must only have 1 committee bit set.
assert_eq!(other.committee_bits.num_set_bits(), 1);
// Check we are aggregating in increasing committee index order (so we can append
// aggregation bits).
assert!(self.committee_bits.highest_set_bit() < other.committee_bits.highest_set_bit());

self.committee_bits = self.committee_bits.union(&other.committee_bits);
self.aggregation_bits =
bitlist_extend(&self.aggregation_bits, &other.aggregation_bits).unwrap();
self.attesting_indices = self
.attesting_indices
.drain(..)
.merge(other.attesting_indices.iter().copied())
.dedup()
.collect();
self.aggregation_bits = self.aggregation_bits.union(&other.aggregation_bits);
self.signature.add_assign_aggregate(&other.signature);
}

pub fn committee_index(&self) -> u64 {
*self.get_committee_indices().first().unwrap_or(&0u64)
}

pub fn get_committee_indices(&self) -> Vec<u64> {
self.committee_bits
.iter()
.enumerate()
.filter_map(|(index, bit)| if bit { Some(index as u64) } else { None })
.collect()
}
}

// TODO(electra): upstream this or a more efficient implementation
fn bitlist_extend<N: Unsigned>(list1: &BitList<N>, list2: &BitList<N>) -> Option<BitList<N>> {
let new_length = list1.len() + list2.len();
let mut list = BitList::<N>::with_capacity(new_length).ok()?;

// Copy bits from list1.
for (i, bit) in list1.iter().enumerate() {
list.set(i, bit).ok()?;
}

// Copy bits from list2, starting from the end of list1.
let offset = list1.len();
for (i, bit) in list2.iter().enumerate() {
list.set(offset + i, bit).ok()?;
}

Some(list)
}

impl<E: EthSpec> AttestationMap<E> {
Expand All @@ -239,34 +295,106 @@ impl<E: EthSpec> AttestationMap<E> {
let attestation_map = self.checkpoint_map.entry(checkpoint).or_default();
let attestations = attestation_map.attestations.entry(data).or_default();

// TODO(electra):
// Greedily aggregate the attestation with all existing attestations.
// NOTE: this is sub-optimal and in future we will remove this in favour of max-clique
// aggregation.
let mut aggregated = false;

match attestation {
Attestation::Base(_) => {
for existing_attestation in attestations.iter_mut() {
if existing_attestation.signers_disjoint_from(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
} else if *existing_attestation == indexed {
aggregated = true;
}
}
for existing_attestation in attestations.iter_mut() {
if existing_attestation.should_aggregate(&indexed) {
existing_attestation.aggregate(&indexed);
aggregated = true;
} else if *existing_attestation == indexed {
aggregated = true;
}
// TODO(electra) in order to be devnet ready, we can skip
// aggregating here for now. this will result in "poorly"
// constructed blocks, but that should be fine for devnet
Attestation::Electra(_) => (),
};
}

if !aggregated {
attestations.push(indexed);
}
}

/// Aggregate Electra attestations for the same attestation data signed by different
/// committees.
///
/// Non-Electra attestations are left as-is.
pub fn aggregate_across_committees(&mut self, checkpoint_key: CheckpointKey) {
let Some(attestation_map) = self.checkpoint_map.get_mut(&checkpoint_key) else {
return;
};
for compact_indexed_attestations in attestation_map.attestations.values_mut() {
let unaggregated_attestations = std::mem::take(compact_indexed_attestations);
let mut aggregated_attestations: Vec<CompactIndexedAttestation<E>> = vec![];

// Aggregate the best attestations for each committee and leave the rest.
let mut best_attestations_by_committee: BTreeMap<
u64,
CompactIndexedAttestationElectra<E>,
> = BTreeMap::new();

for committee_attestation in unaggregated_attestations {
let mut electra_attestation = match committee_attestation {
CompactIndexedAttestation::Electra(att)
if att.committee_bits.num_set_bits() == 1 =>
{
att
}
CompactIndexedAttestation::Electra(att) => {
// Aggregate already covers multiple committees, leave it as-is.
aggregated_attestations.push(CompactIndexedAttestation::Electra(att));
continue;
}
CompactIndexedAttestation::Base(att) => {
// Leave as-is.
aggregated_attestations.push(CompactIndexedAttestation::Base(att));
continue;
}
};
let committee_index = electra_attestation.committee_index();
if let Some(existing_attestation) =
best_attestations_by_committee.get_mut(&committee_index)
{
// Search for the best (most aggregation bits) attestation for this committee
// index.
if electra_attestation.aggregation_bits.num_set_bits()
> existing_attestation.aggregation_bits.num_set_bits()
{
// New attestation is better than the previously known one for this
// committee. Replace it.
std::mem::swap(existing_attestation, &mut electra_attestation);
}
// Put the inferior attestation into the list of aggregated attestations
// without performing any cross-committee aggregation.
aggregated_attestations
.push(CompactIndexedAttestation::Electra(electra_attestation));
} else {
// First attestation seen for this committee. Place it in the map
// provisionally.
best_attestations_by_committee.insert(committee_index, electra_attestation);
}
}

if let Some(on_chain_aggregate) =
Self::compute_on_chain_aggregate(best_attestations_by_committee)
{
aggregated_attestations
.push(CompactIndexedAttestation::Electra(on_chain_aggregate));
}

*compact_indexed_attestations = aggregated_attestations;
}
}

pub fn compute_on_chain_aggregate(
mut attestations_by_committee: BTreeMap<u64, CompactIndexedAttestationElectra<E>>,
) -> Option<CompactIndexedAttestationElectra<E>> {
let (_, mut on_chain_aggregate) = attestations_by_committee.pop_first()?;
for (_, attestation) in attestations_by_committee {
on_chain_aggregate.aggregate_with_disjoint_committees(&attestation);
}
Some(on_chain_aggregate)
}

/// Iterate all attestations matching the given `checkpoint_key`.
pub fn get_attestations<'a>(
&'a self,
Expand Down
Loading
Loading