Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: batch validation for electra attestations #6788

Merged
merged 12 commits into from
May 16, 2024
6 changes: 3 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {routes, ServerApi} from "@lodestar/api";
import {Epoch, ssz} from "@lodestar/types";
import {CommitteeIndex, Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
Expand Down Expand Up @@ -64,7 +64,7 @@ export function getBeaconPoolApi({
// when a validator is configured with multiple beacon node urls, this attestation data may come from another beacon node
// and the block hasn't been in our forkchoice since we haven't seen / processing that block
// see https://github.com/ChainSafe/lodestar/issues/5098
const {indexedAttestation, subnet, attDataRootHex} = await validateGossipFnRetryUnknownRoot(
const {indexedAttestation, subnet, attDataRootHex, committeeIndex} = await validateGossipFnRetryUnknownRoot(
validateFn,
network,
chain,
Expand All @@ -73,7 +73,7 @@ export function getBeaconPoolApi({
);

if (network.shouldAggregate(subnet, slot)) {
const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex);
const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

Expand Down
29 changes: 14 additions & 15 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ type CommitteeIndex = number;
* receives and it can be triggered manually.
*/
export class AttestationPool {
private readonly attestationByRootBySlot = new MapDef<Slot, Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>>(
() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>()
);
private readonly aggregateByIndexByRootBySlot = new MapDef<
Slot,
Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>
>(() => new Map<DataRootHex, Map<CommitteeIndex, AggregateFast>>());
private lowestPermissibleSlot = 0;

constructor(
Expand All @@ -76,8 +77,10 @@ export class AttestationPool {
/** Returns current count of pre-aggregated attestations with unique data */
getAttestationCount(): number {
let attestationCount = 0;
for (const attestationByRoot of this.attestationByRootBySlot.values()) {
attestationCount += attestationByRoot.size;
for (const attestationByIndexByRoot of this.aggregateByIndexByRootBySlot.values()) {
for (const attestationByIndex of attestationByIndexByRoot.values()) {
attestationCount += attestationByIndex.size;
}
}
return attestationCount;
}
Expand All @@ -99,7 +102,7 @@ export class AttestationPool {
* - Valid committeeIndex
* - Valid data
*/
add(attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome {
add(committeeIndex: CommitteeIndex, attestation: allForks.Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const lowestPermissibleSlot = this.lowestPermissibleSlot;

Expand All @@ -114,15 +117,11 @@ export class AttestationPool {
}

// Limit object per slot
const aggregateByRoot = this.attestationByRootBySlot.getOrDefault(slot);
const aggregateByRoot = this.aggregateByIndexByRootBySlot.getOrDefault(slot);
if (aggregateByRoot.size >= MAX_ATTESTATIONS_PER_SLOT) {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");

Expand All @@ -147,7 +146,7 @@ export class AttestationPool {
* For validator API to get an aggregate
*/
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): allForks.Attestation | null {
const aggregate = this.attestationByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
Expand All @@ -161,7 +160,7 @@ export class AttestationPool {
* By default, not interested in attestations in old slots, we only preaggregate attestations for the current slot.
*/
prune(clockSlot: Slot): void {
pruneBySlot(this.attestationByRootBySlot, clockSlot, SLOTS_RETAINED);
pruneBySlot(this.aggregateByIndexByRootBySlot, clockSlot, SLOTS_RETAINED);
// by default preaggregateSlotDistance is 0, i.e only accept attestations in the same clock slot.
this.lowestPermissibleSlot = Math.max(clockSlot - this.preaggregateSlotDistance, 0);
}
Expand All @@ -175,8 +174,8 @@ export class AttestationPool {

const aggregateByRoots =
bySlot === undefined
? Array.from(this.attestationByRootBySlot.values())
: [this.attestationByRootBySlot.get(bySlot)];
? Array.from(this.aggregateByIndexByRootBySlot.values())
: [this.aggregateByIndexByRootBySlot.get(bySlot)];

for (const aggregateByRoot of aggregateByRoots) {
if (aggregateByRoot) {
Expand Down
24 changes: 15 additions & 9 deletions packages/beacon-node/src/chain/seenCache/seenAttestationData.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {phase0, RootHex, Slot} from "@lodestar/types";
import {BitArray} from "@chainsafe/ssz";
import {CommitteeIndex, phase0, RootHex, Slot} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {AttDataBase64} from "../../util/sszBytes.js";
import {SeenAttDataKey} from "../../util/sszBytes.js";
import {InsertOutcome} from "../opPools/types.js";

export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeIndices: Uint32Array;
committeeValidatorIndices: Uint32Array;
// undefined for phase0 Attestation
committeeBits?: BitArray;
committeeIndex: CommitteeIndex;
// IndexedAttestationData signing root, 32 bytes
signingRoot: Uint8Array;
// to be consumed by forkchoice and oppool
Expand Down Expand Up @@ -38,12 +42,14 @@ const DEFAULT_MAX_CACHE_SIZE_PER_SLOT = 200;
const DEFAULT_CACHE_SLOT_DISTANCE = 2;

/**
* Cached seen AttestationData to improve gossip validation. For Electra, this still take into account attestationIndex
* even through it is moved outside of AttestationData.
* As of April 2023, validating gossip attestation takes ~12% of cpu time for a node subscribing to all subnets on mainnet.
* Having this cache help saves a lot of cpu time since most of the gossip attestations are on the same slot.
*/
export class SeenAttestationDatas {
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<AttDataBase64, AttestationDataCacheEntry>>(
() => new Map<AttDataBase64, AttestationDataCacheEntry>()
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<SeenAttDataKey, AttestationDataCacheEntry>>(
() => new Map<SeenAttDataKey, AttestationDataCacheEntry>()
);
private lowestPermissibleSlot = 0;

Expand All @@ -57,14 +63,14 @@ export class SeenAttestationDatas {
}

// TODO: Move InsertOutcome type definition to a common place
add(slot: Slot, attDataBase64: AttDataBase64, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
add(slot: Slot, attDataKey: SeenAttDataKey, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.getOrDefault(slot);
if (cacheEntryByAttDataBase64.has(attDataBase64)) {
if (cacheEntryByAttDataBase64.has(attDataKey)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}
Expand All @@ -74,11 +80,11 @@ export class SeenAttestationDatas {
return InsertOutcome.ReachLimit;
}

cacheEntryByAttDataBase64.set(attDataBase64, cacheEntry);
cacheEntryByAttDataBase64.set(attDataKey, cacheEntry);
return InsertOutcome.NewData;
}

get(slot: Slot, attDataBase64: AttDataBase64): AttestationDataCacheEntry | null {
get(slot: Slot, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.get(slot);
const cacheEntry = cacheEntryByAttDataBase64?.get(attDataBase64);
if (cacheEntry) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async function validateAggregateAndProof(
// [REJECT] The committee index is within the expected range
// -- i.e. data.index < get_committee_count_per_slot(state, data.target.epoch)
const committeeIndices = cachedAttData
? cachedAttData.committeeIndices
? cachedAttData.committeeValidatorIndices
: getCommitteeIndices(shuffling, attSlot, attIndex);

// [REJECT] The number of aggregation bits matches the committee size
Expand Down
Loading
Loading