Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Oppool aggregates use BitArray only for set logic #4034

Merged
merged 5 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/lodestar/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ export function getValidatorApi({chain, config, logger, metrics, network, sync}:

chain.aggregatedAttestationPool.add(
signedAggregateAndProof.message.aggregate,
indexedAttestation.attestingIndices,
indexedAttestation.attestingIndices.length,
committeeIndices
);
const sentPeers = await network.gossip.publishBeaconAggregateAndProof(signedAggregateAndProof);
Expand Down
190 changes: 132 additions & 58 deletions packages/lodestar/src/chain/opPools/aggregatedAttestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from "@chainsafe/lodestar-beacon-state-transition";
import {toHexString} from "@chainsafe/ssz";
import {MapDef} from "../../util/map";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray";
import {pruneBySlot} from "./utils";
import {InsertOutcome} from "./types";

Expand Down Expand Up @@ -54,7 +55,7 @@ export class AggregatedAttestationPool {
);
private lowestPermissibleSlot = 0;

add(attestation: phase0.Attestation, attestingIndices: ValidatorIndex[], committee: ValidatorIndex[]): InsertOutcome {
add(attestation: phase0.Attestation, attestingIndicesCount: number, committee: ValidatorIndex[]): InsertOutcome {
const slot = attestation.data.slot;
const lowestPermissibleSlot = this.lowestPermissibleSlot;

Expand All @@ -73,7 +74,10 @@ export class AggregatedAttestationPool {
attestationGroupByDataHash.set(dataRootHex, attestationGroup);
}

return attestationGroup.add({attestation, attestingIndices: new Set(attestingIndices)});
return attestationGroup.add({
attestation,
trueBitsCount: attestingIndicesCount,
});
}

/** Remove attestations which are too old to be included in a block. */
Expand All @@ -90,10 +94,8 @@ export class AggregatedAttestationPool {
const stateSlot = state.slot;
const stateEpoch = state.epochCtx.epoch;
const statePrevEpoch = stateEpoch - 1;
const forkName = state.config.getForkName(stateSlot);

const getParticipationFn =
forkName === ForkName.phase0 ? this.getParticipationPhase0(state) : this.getParticipationAltair(state);
dapplion marked this conversation as resolved.
Show resolved Hide resolved
const getParticipation = getParticipationFn(state);

const attestationsByScore: AttestationWithScore[] = [];

Expand Down Expand Up @@ -128,7 +130,7 @@ export class AggregatedAttestationPool {
) {
continue;
}
const participation = getParticipationFn(epoch, attestationGroup.committee);
const participation = getParticipation(epoch, attestationGroup.committee);
if (participation === null) {
continue;
}
Expand All @@ -144,7 +146,7 @@ export class AggregatedAttestationPool {
attestationsByScore.push(
...attestationGroup.getAttestationsForBlock(participation).map((attestation) => ({
attestation: attestation.attestation,
score: (attestation.notSeenAttesterCount ?? attestation.attestingIndices.size) / (stateSlot - slot),
score: attestation.notSeenAttesterCount / (stateSlot - slot),
}))
);

Expand Down Expand Up @@ -241,11 +243,15 @@ export class AggregatedAttestationPool {
// eslint-disable-next-line @typescript-eslint/naming-convention
interface AttestationWithIndex {
attestation: phase0.Attestation;
attestingIndices: Set<ValidatorIndex>;
trueBitsCount: number;
}

type AttestationNonParticipant = {
attestation: phase0.Attestation;
// this is <= attestingIndices.count since some attesters may be seen by the chain
// this is only updated and used in removeBySeenValidators function
notSeenAttesterCount?: number;
}
notSeenAttesterCount: number;
};

/**
* Maintain a pool of AggregatedAttestation which all share the same AttestationData.
Expand All @@ -265,64 +271,80 @@ export class MatchingDataAttestationGroup {
* If it's a superset of an existing attestation, remove the existing attestation and add new.
*/
add(attestation: AttestationWithIndex): InsertOutcome {
const {attestingIndices} = attestation;
// preaggregate
let insertResult = InsertOutcome.NewData;
const newBits = attestation.attestation.aggregationBits;

const indicesToRemove = [];
for (const [i, existingAttestation] of this.attestations.entries()) {
const existingAttestingIndices = existingAttestation.attestingIndices;
const numIntersection =
// TODO: Intersect the uint8arrays from BitArray directly, it's probably much faster
existingAttestingIndices.size >= attestingIndices.size
? intersection(existingAttestingIndices, attestingIndices)
: intersection(attestingIndices, existingAttestingIndices);
// no intersection
if (numIntersection === 0) {
aggregateInto(existingAttestation, attestation);
insertResult = InsertOutcome.Aggregated;
} else if (numIntersection === attestingIndices.size) {
// this new attestation is actually a subset of an existing one, don't want to add it
insertResult = InsertOutcome.AlreadyKnown;
} else if (numIntersection === existingAttestingIndices.size) {
// this new attestation is superset of an existing one, remove existing one
indicesToRemove.push(i);

for (const [i, prevAttestation] of this.attestations.entries()) {
const prevBits = prevAttestation.attestation.aggregationBits;

switch (intersectUint8Arrays(newBits.uint8Array, prevBits.uint8Array)) {
case IntersectResult.Subset:
case IntersectResult.Equal:
// this new attestation is actually a subset of an existing one, don't want to add it
return InsertOutcome.AlreadyKnown;

case IntersectResult.Exclusive:
// no intersection
aggregateInto(prevAttestation, attestation);
return InsertOutcome.Aggregated;

case IntersectResult.Superset:
// newBits superset of prevBits
// this new attestation is superset of an existing one, remove existing one
indicesToRemove.push(i);
}
}
if (insertResult === InsertOutcome.NewData) {
for (const index of indicesToRemove.reverse()) {
this.attestations.splice(index, 1);
}
this.attestations.push(attestation);
// Remove the attestations with less participation
if (this.attestations.length > MAX_RETAINED_ATTESTATIONS_PER_GROUP) {
this.attestations.sort((a, b) => b.attestingIndices.size - a.attestingIndices.size);
this.attestations.splice(
MAX_RETAINED_ATTESTATIONS_PER_GROUP,
this.attestations.length - MAX_RETAINED_ATTESTATIONS_PER_GROUP
);
}

// Added new data
for (const index of indicesToRemove.reverse()) {
// TODO: .splice performance warning
this.attestations.splice(index, 1);
}
return insertResult;

this.attestations.push(attestation);

// Remove the attestations with less participation
if (this.attestations.length > MAX_RETAINED_ATTESTATIONS_PER_GROUP) {
this.attestations.sort((a, b) => b.trueBitsCount - a.trueBitsCount);
this.attestations.splice(
MAX_RETAINED_ATTESTATIONS_PER_GROUP,
this.attestations.length - MAX_RETAINED_ATTESTATIONS_PER_GROUP
);
}

return InsertOutcome.NewData;
}

getAttestationsForBlock(seenAttestingIndices: Set<ValidatorIndex>): AttestationWithIndex[] {
const attestations: AttestationWithIndex[] = [];
getAttestationsForBlock(seenAttestingIndices: Set<ValidatorIndex>): AttestationNonParticipant[] {
const attestations: AttestationNonParticipant[] = [];

const committeeLen = this.committee.length;
const committeeSeenAttesting = new Array<boolean>(committeeLen);

for (const attestation of this.attestations) {
// Intersect committee with participation only once for all attestations
for (let i = 0; i < committeeLen; i++) {
committeeSeenAttesting[i] = seenAttestingIndices.has(this.committee[i]);
}

for (const {attestation} of this.attestations) {
const {aggregationBits} = attestation;
let notSeenAttesterCount = 0;
for (const attIndex of attestation.attestingIndices) {
if (!seenAttestingIndices.has(attIndex)) notSeenAttesterCount++;

for (let i = 0; i < committeeLen; i++) {
// TODO: Optimize aggregationBits.get() in bulk for the entire BitArray
if (!committeeSeenAttesting[i] && aggregationBits.get(i)) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
notSeenAttesterCount++;
}
}

if (notSeenAttesterCount > 0) {
attestations.push({...attestation, notSeenAttesterCount});
attestations.push({attestation, notSeenAttesterCount});
}
}

return attestations
.sort(
(a, b) =>
(b.notSeenAttesterCount ?? b.attestingIndices.size) - (a.notSeenAttesterCount ?? a.attestingIndices.size)
)
.sort((a, b) => b.notSeenAttesterCount - a.notSeenAttesterCount)
.slice(0, MAX_ATTESTATIONS_PER_GROUP);
}

Expand All @@ -333,10 +355,6 @@ export class MatchingDataAttestationGroup {
}

export function aggregateInto(attestation1: AttestationWithIndex, attestation2: AttestationWithIndex): void {
for (const attIndex of attestation2.attestingIndices) {
attestation1.attestingIndices.add(attIndex);
}

// Merge bits of attestation2 into attestation1
attestation1.attestation.aggregationBits.mergeOrWith(attestation2.attestation.aggregationBits);

Expand All @@ -345,6 +363,62 @@ export function aggregateInto(attestation1: AttestationWithIndex, attestation2:
attestation1.attestation.signature = bls.Signature.aggregate([signature1, signature2]).toBytes();
}

/**
* Pre-compute participation from a CachedBeaconStateAllForks, for use to check if an attestation's committee
* has already attested or not.
*/
export function getParticipationFn(state: CachedBeaconStateAllForks): GetParticipationFn {
if (state.config.getForkName(state.slot) === ForkName.phase0) {
// Get attestations to be included in a phase0 block.
// As we are close to altair, this is not really important, it's mainly for e2e.
// The performance is not great due to the different BeaconState data structure to altair.
// check for phase0 block already
const phase0State = state as CachedBeaconStatePhase0;
const stateEpoch = computeEpochAtSlot(state.slot);

const previousEpochParticipants = extractParticipation(
phase0State.previousEpochAttestations.getAllReadonly(),
state
);
const currentEpochParticipants = extractParticipation(phase0State.currentEpochAttestations.getAllReadonly(), state);

return (epoch: Epoch) => {
return epoch === stateEpoch
? currentEpochParticipants
: epoch === stateEpoch - 1
? previousEpochParticipants
: null;
};
}

// altair and future forks
else {
// Get attestations to be included in an altair block.
// Attestations are sorted by inclusion distance then number of attesters.
// Attestations should pass the validation when processing attestations in beacon-state-transition.
// check for altair block already
const altairState = state as CachedBeaconStateAltair;
const previousParticipation = altairState.previousEpochParticipation.getAll();
const currentParticipation = altairState.currentEpochParticipation.getAll();
const stateEpoch = computeEpochAtSlot(state.slot);

return (epoch: Epoch, committee: number[]) => {
const participationStatus =
epoch === stateEpoch ? currentParticipation : epoch === stateEpoch - 1 ? previousParticipation : null;

if (participationStatus === null) return null;

const seenValidatorIndices = new Set<ValidatorIndex>();
for (const validatorIndex of committee) {
if (flagIsTimelySource(participationStatus[validatorIndex])) {
seenValidatorIndices.add(validatorIndex);
}
}
return seenValidatorIndices;
};
}
}

dapplion marked this conversation as resolved.
Show resolved Hide resolved
export function extractParticipation(
attestations: phase0.PendingAttestation[],
state: CachedBeaconStateAllForks
Expand Down
4 changes: 2 additions & 2 deletions packages/lodestar/src/network/gossip/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {toHexString} from "@chainsafe/ssz";
import PeerId from "peer-id";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {phase0, ssz, ValidatorIndex} from "@chainsafe/lodestar-types";
import {phase0, ssz} from "@chainsafe/lodestar-types";
import {ILogger, prettyBytes} from "@chainsafe/lodestar-utils";
import {IMetrics} from "../../../metrics";
import {OpSource} from "../../../metrics/validatorMonitor";
Expand Down Expand Up @@ -168,7 +168,7 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH

chain.aggregatedAttestationPool.add(
aggregatedAttestation,
indexedAttestation.attestingIndices as ValidatorIndex[],
indexedAttestation.attestingIndices.length,
committeeIndices
);

Expand Down
74 changes: 74 additions & 0 deletions packages/lodestar/src/util/bitArray.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
export enum IntersectResult {
Equal,
/** All elements in set B are in set A */
Superset,
/** All elements in set A are in set B */
Subset,
/** Set A and set B do not share any elements */
Exclusive,
/** Set A and set B intersect but are not superset or subset */
Intersect,
}

/**
* For each byte check if a includes b,
* | a | b | result |
* | -------- | -------- | ------------- |
* | 00001111 | 00001111 | A equals B |
* | 00001111 | 00000011 | A superset B |
* | 00000011 | 00001111 | A subset B |
* | 11110000 | 00001111 | A exclude B |
* | 11111100 | 00111111 | A intersect B |
*
* For all bytes in BitArray:
* - equals = MAYBE ONLY equals
* - excludes = MUST ONLY equals
* - superset = MUST superset MAYBE equal
* - subset = MUST subset MAYBE equal
* - intersect = any other condition
*/
export function intersectUint8Arrays(aUA: Uint8Array, bUA: Uint8Array): IntersectResult {
const len = aUA.length;

let someEquals = false;
let someExcludes = false;
let someSuperset = false;
let someSubset = false;

for (let i = 0; i < len; i++) {
const a = aUA[i];
const b = bUA[i];

if (a === 0 && b === 0) {
// zero, skip
} else if (a === b) {
// A equals B
someEquals = true;
} else if ((a & b) === 0) {
// A excludes B
someExcludes = true;
} else if ((a & b) === b) {
// A superset B
if (someSubset) return IntersectResult.Intersect;
someSuperset = true;
} else if ((a & b) === a) {
// A subset B
if (someSuperset) return IntersectResult.Intersect;
someSubset = true;
} else {
// A diff B
return IntersectResult.Intersect;
}
}

// equals = MAYBE ONLY equals
if (!someExcludes && !someSuperset && !someSubset) return IntersectResult.Equal;
// excludes = MUST ONLY equals
if (!someEquals && someExcludes && !someSuperset && !someSubset) return IntersectResult.Exclusive;
// superset = MUST superset MAYBE equal
if (!someExcludes && someSuperset && !someSubset) return IntersectResult.Superset;
// subset = MUST subset MAYBE equal
if (!someExcludes && !someSuperset && someSubset) return IntersectResult.Subset;
// intersect = any other condition
else return IntersectResult.Intersect;
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ function getAggregatedAttestationPool(state: CachedBeaconStateAltair): Aggregate
const committee = state.epochCtx.getBeaconCommittee(slot, committeeIndex);
// all attestation has full participation so getAttestationsForBlock() has to do a lot of filter
// aggregate_and_proof messages
pool.add(attestation, committee, committee);
pool.add(attestation, committee.length, committee);
}
}
return pool;
Expand Down
Loading