Skip to content

Commit

Permalink
Oppool aggregates use BitArray only for set logic (#4034)
Browse files Browse the repository at this point in the history
* Use BitArrays for aggregate merging

* Test intersectUint8Arrays

* Review PR

* Update tests

* Remove un-used code
  • Loading branch information
dapplion authored May 19, 2022
1 parent 94699ee commit 2535f40
Show file tree
Hide file tree
Showing 8 changed files with 483 additions and 256 deletions.
2 changes: 1 addition & 1 deletion packages/lodestar/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ export function getValidatorApi({chain, config, logger, metrics, network, sync}:

chain.aggregatedAttestationPool.add(
signedAggregateAndProof.message.aggregate,
indexedAttestation.attestingIndices,
indexedAttestation.attestingIndices.length,
committeeIndices
);
const sentPeers = await network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof);
Expand Down
251 changes: 132 additions & 119 deletions packages/lodestar/src/chain/opPools/aggregatedAttestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "@chainsafe/lodestar-beacon-state-transition";
import {toHexString} from "@chainsafe/ssz";
import {MapDef} from "../../util/map";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray";
import {pruneBySlot} from "./utils";
import {InsertOutcome} from "./types";

Expand Down Expand Up @@ -54,7 +55,7 @@ export class AggregatedAttestationPool {
);
private lowestPermissibleSlot = 0;

add(attestation: phase0.Attestation, attestingIndices: ValidatorIndex[], committee: ValidatorIndex[]): InsertOutcome {
add(attestation: phase0.Attestation, attestingIndicesCount: number, committee: ValidatorIndex[]): InsertOutcome {
const slot = attestation.data.slot;
const lowestPermissibleSlot = this.lowestPermissibleSlot;

Expand All @@ -73,7 +74,10 @@ export class AggregatedAttestationPool {
attestationGroupByDataHash.set(dataRootHex, attestationGroup);
}

return attestationGroup.add({attestation, attestingIndices: new Set(attestingIndices)});
return attestationGroup.add({
attestation,
trueBitsCount: attestingIndicesCount,
});
}

/** Remove attestations which are too old to be included in a block. */
Expand All @@ -90,10 +94,8 @@ export class AggregatedAttestationPool {
const stateSlot = state.slot;
const stateEpoch = state.epochCtx.epoch;
const statePrevEpoch = stateEpoch - 1;
const forkName = state.config.getForkName(stateSlot);

const getParticipationFn =
forkName === ForkName.phase0 ? this.getParticipationPhase0(state) : this.getParticipationAltair(state);
const getParticipation = getParticipationFn(state);

const attestationsByScore: AttestationWithScore[] = [];

Expand Down Expand Up @@ -128,7 +130,7 @@ export class AggregatedAttestationPool {
) {
continue;
}
const participation = getParticipationFn(epoch, attestationGroup.committee);
const participation = getParticipation(epoch, attestationGroup.committee);
if (participation === null) {
continue;
}
Expand All @@ -144,7 +146,7 @@ export class AggregatedAttestationPool {
attestationsByScore.push(
...attestationGroup.getAttestationsForBlock(participation).map((attestation) => ({
attestation: attestation.attestation,
score: (attestation.notSeenAttesterCount ?? attestation.attestingIndices.size) / (stateSlot - slot),
score: attestation.notSeenAttesterCount / (stateSlot - slot),
}))
);

Expand Down Expand Up @@ -183,69 +185,20 @@ export class AggregatedAttestationPool {
}
return attestations;
}

/**
* Get attestations to be included in a phase0 block.
* As we are close to altair, this is not really important, it's mainly for e2e.
* The performance is not great due to the different BeaconState data structure to altair.
*/
private getParticipationPhase0(state: CachedBeaconStateAllForks): GetParticipationFn {
// check for phase0 block already
const phase0State = state as CachedBeaconStatePhase0;
const stateEpoch = computeEpochAtSlot(state.slot);

const previousEpochParticipants = extractParticipation(
phase0State.previousEpochAttestations.getAllReadonly(),
state
);
const currentEpochParticipants = extractParticipation(phase0State.currentEpochAttestations.getAllReadonly(), state);

return (epoch: Epoch) => {
return epoch === stateEpoch
? currentEpochParticipants
: epoch === stateEpoch - 1
? previousEpochParticipants
: null;
};
}

/**
* Get attestations to be included in an altair block.
* Attestations are sorted by inclusion distance then number of attesters.
* Attestations should pass the validation when processing attestations in beacon-state-transition.
*/
private getParticipationAltair(state: CachedBeaconStateAllForks): GetParticipationFn {
// check for altair block already
const altairState = state as CachedBeaconStateAltair;
const stateEpoch = computeEpochAtSlot(state.slot);
const previousParticipation = altairState.previousEpochParticipation.getAll();
const currentParticipation = altairState.currentEpochParticipation.getAll();

return (epoch: Epoch, committee: number[]) => {
const participationStatus =
epoch === stateEpoch ? currentParticipation : epoch === stateEpoch - 1 ? previousParticipation : null;

if (participationStatus === null) return null;

const seenValidatorIndices = new Set<ValidatorIndex>();
for (const validatorIndex of committee) {
if (flagIsTimelySource(participationStatus[validatorIndex])) {
seenValidatorIndices.add(validatorIndex);
}
}
return seenValidatorIndices;
};
}
}

// eslint-disable-next-line @typescript-eslint/naming-convention
interface AttestationWithIndex {
attestation: phase0.Attestation;
attestingIndices: Set<ValidatorIndex>;
trueBitsCount: number;
}

type AttestationNonParticipant = {
attestation: phase0.Attestation;
// this is <= attestingIndices.count since some attesters may be seen by the chain
// this is only updated and used in removeBySeenValidators function
notSeenAttesterCount?: number;
}
notSeenAttesterCount: number;
};

/**
* Maintain a pool of AggregatedAttestation which all share the same AttestationData.
Expand All @@ -265,64 +218,80 @@ export class MatchingDataAttestationGroup {
* If it's a superset of an existing attestation, remove the existing attestation and add new.
*/
add(attestation: AttestationWithIndex): InsertOutcome {
const {attestingIndices} = attestation;
// preaggregate
let insertResult = InsertOutcome.NewData;
const newBits = attestation.attestation.aggregationBits;

const indicesToRemove = [];
for (const [i, existingAttestation] of this.attestations.entries()) {
const existingAttestingIndices = existingAttestation.attestingIndices;
const numIntersection =
// TODO: Intersect the uint8arrays from BitArray directly, it's probably much faster
existingAttestingIndices.size >= attestingIndices.size
? intersection(existingAttestingIndices, attestingIndices)
: intersection(attestingIndices, existingAttestingIndices);
// no intersection
if (numIntersection === 0) {
aggregateInto(existingAttestation, attestation);
insertResult = InsertOutcome.Aggregated;
} else if (numIntersection === attestingIndices.size) {
// this new attestation is actually a subset of an existing one, don't want to add it
insertResult = InsertOutcome.AlreadyKnown;
} else if (numIntersection === existingAttestingIndices.size) {
// this new attestation is superset of an existing one, remove existing one
indicesToRemove.push(i);

for (const [i, prevAttestation] of this.attestations.entries()) {
const prevBits = prevAttestation.attestation.aggregationBits;

switch (intersectUint8Arrays(newBits.uint8Array, prevBits.uint8Array)) {
case IntersectResult.Subset:
case IntersectResult.Equal:
// this new attestation is actually a subset of an existing one, don't want to add it
return InsertOutcome.AlreadyKnown;

case IntersectResult.Exclusive:
// no intersection
aggregateInto(prevAttestation, attestation);
return InsertOutcome.Aggregated;

case IntersectResult.Superset:
// newBits superset of prevBits
// this new attestation is superset of an existing one, remove existing one
indicesToRemove.push(i);
}
}
if (insertResult === InsertOutcome.NewData) {
for (const index of indicesToRemove.reverse()) {
this.attestations.splice(index, 1);
}
this.attestations.push(attestation);
// Remove the attestations with less participation
if (this.attestations.length > MAX_RETAINED_ATTESTATIONS_PER_GROUP) {
this.attestations.sort((a, b) => b.attestingIndices.size - a.attestingIndices.size);
this.attestations.splice(
MAX_RETAINED_ATTESTATIONS_PER_GROUP,
this.attestations.length - MAX_RETAINED_ATTESTATIONS_PER_GROUP
);
}

// Added new data
for (const index of indicesToRemove.reverse()) {
// TODO: .splice performance warning
this.attestations.splice(index, 1);
}
return insertResult;

this.attestations.push(attestation);

// Remove the attestations with less participation
if (this.attestations.length > MAX_RETAINED_ATTESTATIONS_PER_GROUP) {
this.attestations.sort((a, b) => b.trueBitsCount - a.trueBitsCount);
this.attestations.splice(
MAX_RETAINED_ATTESTATIONS_PER_GROUP,
this.attestations.length - MAX_RETAINED_ATTESTATIONS_PER_GROUP
);
}

return InsertOutcome.NewData;
}

getAttestationsForBlock(seenAttestingIndices: Set<ValidatorIndex>): AttestationWithIndex[] {
const attestations: AttestationWithIndex[] = [];
getAttestationsForBlock(seenAttestingIndices: Set<ValidatorIndex>): AttestationNonParticipant[] {
const attestations: AttestationNonParticipant[] = [];

const committeeLen = this.committee.length;
const committeeSeenAttesting = new Array<boolean>(committeeLen);

for (const attestation of this.attestations) {
// Intersect committee with participation only once for all attestations
for (let i = 0; i < committeeLen; i++) {
committeeSeenAttesting[i] = seenAttestingIndices.has(this.committee[i]);
}

for (const {attestation} of this.attestations) {
const {aggregationBits} = attestation;
let notSeenAttesterCount = 0;
for (const attIndex of attestation.attestingIndices) {
if (!seenAttestingIndices.has(attIndex)) notSeenAttesterCount++;

for (let i = 0; i < committeeLen; i++) {
// TODO: Optimize aggregationBits.get() in bulk for the entire BitArray
if (!committeeSeenAttesting[i] && aggregationBits.get(i)) {
notSeenAttesterCount++;
}
}

if (notSeenAttesterCount > 0) {
attestations.push({...attestation, notSeenAttesterCount});
attestations.push({attestation, notSeenAttesterCount});
}
}

return attestations
.sort(
(a, b) =>
(b.notSeenAttesterCount ?? b.attestingIndices.size) - (a.notSeenAttesterCount ?? a.attestingIndices.size)
)
.sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount)
.slice(0, MAX_ATTESTATIONS_PER_GROUP);
}

Expand All @@ -333,10 +302,6 @@ export class MatchingDataAttestationGroup {
}

export function aggregateInto(attestation1: AttestationWithIndex, attestation2: AttestationWithIndex): void {
for (const attIndex of attestation2.attestingIndices) {
attestation1.attestingIndices.add(attIndex);
}

// Merge bits of attestation2 into attestation1
attestation1.attestation.aggregationBits.mergeOrWith(attestation2.attestation.aggregationBits);

Expand All @@ -345,6 +310,62 @@ export function aggregateInto(attestation1: AttestationWithIndex, attestation2:
attestation1.attestation.signature = bls.Signature.aggregate([signature1, signature2]).toBytes();
}

/**
* Pre-compute participation from a CachedBeaconStateAllForks, for use to check if an attestation's committee
* has already attested or not.
*/
export function getParticipationFn(state: CachedBeaconStateAllForks): GetParticipationFn {
if (state.config.getForkName(state.slot) === ForkName.phase0) {
// Get attestations to be included in a phase0 block.
// As we are close to altair, this is not really important, it's mainly for e2e.
// The performance is not great due to the different BeaconState data structure to altair.
// check for phase0 block already
const phase0State = state as CachedBeaconStatePhase0;
const stateEpoch = computeEpochAtSlot(state.slot);

const previousEpochParticipants = extractParticipation(
phase0State.previousEpochAttestations.getAllReadonly(),
state
);
const currentEpochParticipants = extractParticipation(phase0State.currentEpochAttestations.getAllReadonly(), state);

return (epoch: Epoch) => {
return epoch === stateEpoch
? currentEpochParticipants
: epoch === stateEpoch - 1
? previousEpochParticipants
: null;
};
}

// altair and future forks
else {
// Get attestations to be included in an altair block.
// Attestations are sorted by inclusion distance then number of attesters.
// Attestations should pass the validation when processing attestations in beacon-state-transition.
// check for altair block already
const altairState = state as CachedBeaconStateAltair;
const previousParticipation = altairState.previousEpochParticipation.getAll();
const currentParticipation = altairState.currentEpochParticipation.getAll();
const stateEpoch = computeEpochAtSlot(state.slot);

return (epoch: Epoch, committee: number[]) => {
const participationStatus =
epoch === stateEpoch ? currentParticipation : epoch === stateEpoch - 1 ? previousParticipation : null;

if (participationStatus === null) return null;

const seenValidatorIndices = new Set<ValidatorIndex>();
for (const validatorIndex of committee) {
if (flagIsTimelySource(participationStatus[validatorIndex])) {
seenValidatorIndices.add(validatorIndex);
}
}
return seenValidatorIndices;
};
}
}

export function extractParticipation(
attestations: phase0.PendingAttestation[],
state: CachedBeaconStateAllForks
Expand All @@ -365,14 +386,6 @@ export function extractParticipation(
return allParticipants;
}

export function intersection(bigSet: Set<ValidatorIndex>, smallSet: Set<ValidatorIndex>): number {
let numIntersection = 0;
for (const validatorIndex of smallSet) {
if (bigSet.has(validatorIndex)) numIntersection++;
}
return numIntersection;
}

/**
* The state transition accepts incorrect target and head attestations.
* We only need to validate the source checkpoint.
Expand Down
4 changes: 2 additions & 2 deletions packages/lodestar/src/network/gossip/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {toHexString} from "@chainsafe/ssz";
import PeerId from "peer-id";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {phase0, ssz, ValidatorIndex} from "@chainsafe/lodestar-types";
import {phase0, ssz} from "@chainsafe/lodestar-types";
import {ILogger, prettyBytes} from "@chainsafe/lodestar-utils";
import {IMetrics} from "../../../metrics";
import {OpSource} from "../../../metrics/validatorMonitor";
Expand Down Expand Up @@ -168,7 +168,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

chain.aggregatedAttestationPool.add(
aggregatedAttestation,
indexedAttestation.attestingIndices as ValidatorIndex[],
indexedAttestation.attestingIndices.length,
committeeIndices
);

Expand Down
Loading

0 comments on commit 2535f40

Please sign in to comment.