Skip to content

Commit

Permalink
feat: pre-electra support from attestation pool (#6998)
Browse files Browse the repository at this point in the history
* Initial commit

* Update packages/beacon-node/src/chain/chain.ts

Co-authored-by: Nico Flaig <[email protected]>

* Update packages/beacon-node/src/api/impl/validator/index.ts

Co-authored-by: Nico Flaig <[email protected]>

* Update packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts

Co-authored-by: Nico Flaig <[email protected]>

* address comment

* Add unit test for attestation pool

* fix: getSeenAttDataKey apis (#7009)

* fix: getSeenAttDataKey apis

* chore: use ForkName instead of ForkSeq

* Update packages/beacon-node/src/util/sszBytes.ts

Co-authored-by: Nico Flaig <[email protected]>

* Update packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts

Co-authored-by: Nico Flaig <[email protected]>

* address comment

* Update error message

* Update packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts

Co-authored-by: Nico Flaig <[email protected]>

* address comment

* Move determining post-electra fork out of loops

---------

Co-authored-by: Nico Flaig <[email protected]>
Co-authored-by: twoeths <[email protected]>
  • Loading branch information
3 people authored and philknows committed Sep 3, 2024
1 parent 1aecf90 commit b572236
Show file tree
Hide file tree
Showing 20 changed files with 437 additions and 136 deletions.
20 changes: 17 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {Epoch, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {Attestation, Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE, isForkPostElectra} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand All @@ -16,6 +16,7 @@ import {
SyncCommitteeError,
} from "../../../../chain/errors/index.js";
import {validateGossipFnRetryUnknownRoot} from "../../../../network/processor/gossipHandlers.js";
import {ApiError} from "../../errors.js";

export function getBeaconPoolApi({
chain,
Expand All @@ -26,7 +27,15 @@ export function getBeaconPoolApi({
return {
async getPoolAttestations({slot, committeeIndex}) {
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
let attestations: Attestation[] = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? chain.clock.currentSlot);

if (isForkPostElectra(fork)) {
throw new ApiError(
400,
`Use getPoolAttestationsV2 to retrieve pool attestations for post-electra fork=${fork}`
);
}

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand All @@ -39,6 +48,11 @@ export function getBeaconPoolApi({
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? attestations[0]?.data.slot ?? chain.clock.currentSlot);
const isPostElectra = isForkPostElectra(fork);

attestations = attestations.filter((attestation) =>
isPostElectra ? isElectraAttestation(attestation) : !isElectraAttestation(attestation)
);

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand Down
28 changes: 25 additions & 3 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1073,9 +1073,31 @@ export function getValidatorApi(
};
},

// TODO Electra: Implement getAggregatedAttestation to properly handle pre-electra
async getAggregatedAttestation() {
throw new Error("Not implemented. Use getAggregatedAttestationV2 for now.");
async getAggregatedAttestation({attestationDataRoot, slot}) {
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHex(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, null, dataRootHex);
const fork = chain.config.getForkName(slot);

if (isForkPostElectra(fork)) {
throw new ApiError(
400,
`Use getAggregatedAttestationV2 to retrieve aggregated attestations for post-electra fork=${fork}`
);
}

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
};
},

async getAggregatedAttestationV2({attestationDataRoot, slot, committeeIndex}) {
Expand Down
10 changes: 8 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class BeaconChain implements IBeaconChain {

// Ops pool
readonly attestationPool: AttestationPool;
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
readonly aggregatedAttestationPool: AggregatedAttestationPool;
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();
Expand Down Expand Up @@ -226,7 +226,13 @@ export class BeaconChain implements IBeaconChain {
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
this.attestationPool = new AttestationPool(
config,
clock,
preAggregateCutOffTime,
this.opts?.preaggregateSlotDistance
);
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config);
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
clock,
preAggregateCutOffTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {BitArray, toHexString} from "@chainsafe/ssz";
import {
ForkName,
ForkSeq,
isForkPostElectra,
MAX_ATTESTATIONS,
MAX_ATTESTATIONS_ELECTRA,
MAX_COMMITTEES_PER_SLOT,
Expand Down Expand Up @@ -30,6 +31,7 @@ import {
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {MapDef, toRootHex, assert} from "@lodestar/utils";
import {ChainForkConfig} from "@lodestar/config";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -101,6 +103,8 @@ export class AggregatedAttestationPool {
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
private lowestPermissibleSlot = 0;

constructor(private readonly config: ChainForkConfig) {}

/** For metrics to track size of the pool */
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
let attestationCount = 0;
Expand Down Expand Up @@ -136,10 +140,20 @@ export class AggregatedAttestationPool {
attestationGroupByIndex = new Map<CommitteeIndex, MatchingDataAttestationGroup>();
attestationGroupByIndexByDataHash.set(dataRootHex, attestationGroupByIndex);
}
const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;

let committeeIndex;

if (isForkPostElectra(this.config.getForkName(slot))) {
if (!isElectraAttestation(attestation)) {
throw Error(`Attestation should be type electra.Attestation for slot ${slot}`);
}
committeeIndex = attestation.committeeBits.getSingleTrueBit();
} else {
if (isElectraAttestation(attestation)) {
throw Error(`Attestation should be type phase0.Attestation for slot ${slot}`);
}
committeeIndex = attestation.data.index;
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
Expand Down Expand Up @@ -390,6 +404,10 @@ export class AggregatedAttestationPool {

/**
* Get all attestations optionally filtered by `attestation.data.slot`
* Note this function is not fork aware and can potentially return a mix
* of phase0.Attestations and electra.Attestations.
* Caller of this function is expected to filtered result if they desire
* a homogenous array.
* @param bySlot slot to filter, `bySlot === attestation.data.slot`
*/
getAll(bySlot?: Slot): Attestation[] {
Expand Down Expand Up @@ -504,8 +522,15 @@ export class MatchingDataAttestationGroup {
*/
getAttestationsForBlock(fork: ForkName, notSeenAttestingIndices: Set<number>): AttestationNonParticipant[] {
const attestations: AttestationNonParticipant[] = [];
const forkSeq = ForkSeq[fork];
const isPostElectra = isForkPostElectra(fork);
for (const {attestation} of this.attestations) {
if (
(isPostElectra && !isElectraAttestation(attestation)) ||
(!isPostElectra && isElectraAttestation(attestation))
) {
continue;
}

let notSeenAttesterCount = 0;
const {aggregationBits} = attestation;
for (const notSeenIndex of notSeenAttestingIndices) {
Expand All @@ -514,13 +539,12 @@ export class MatchingDataAttestationGroup {
}
}

// if fork >= electra, should return electra-only attestations
if (notSeenAttesterCount > 0 && (forkSeq < ForkSeq.electra || isElectraAttestation(attestation))) {
if (notSeenAttesterCount > 0) {
attestations.push({attestation, notSeenAttesterCount});
}
}

const maxAttestation = forkSeq >= ForkSeq.electra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
const maxAttestation = isPostElectra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
if (attestations.length <= maxAttestation) {
return attestations;
} else {
Expand Down
34 changes: 27 additions & 7 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {BitArray} from "@chainsafe/ssz";
import {Signature, aggregateSignatures} from "@chainsafe/blst";
import {Slot, RootHex, isElectraAttestation, Attestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {isForkPostElectra} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {isElectraAggregate, pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

/**
* The number of slots that will be stored in the pool.
Expand All @@ -28,14 +30,15 @@ type AggregateFastPhase0 = {
signature: Signature;
};

type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};
export type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};

type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
export type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;

/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;
/** CommitteeIndex must be null for pre-electra. Must not be null post-electra */
type CommitteeIndex = number | null;

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
Expand Down Expand Up @@ -68,6 +71,7 @@ export class AttestationPool {
private lowestPermissibleSlot = 0;

constructor(
private readonly config: ChainForkConfig,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
Expand Down Expand Up @@ -103,6 +107,7 @@ export class AttestationPool {
*/
add(committeeIndex: CommitteeIndex, attestation: Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const fork = this.config.getForkName(slot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject any attestations that are too old.
Expand All @@ -121,8 +126,14 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");
if (isForkPostElectra(fork)) {
// Electra only: this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool post-electra");
assert.true(isElectraAttestation(attestation), "Attestation should be type electra.Attestation");
} else {
assert.true(!isElectraAttestation(attestation), "Attestation should be type phase0.Attestation");
committeeIndex = null; // For pre-electra, committee index info is encoded in attDataRootIndex
}

// Pre-aggregate the contribution with existing items
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
Expand All @@ -144,14 +155,23 @@ export class AttestationPool {
/**
* For validator API to get an aggregate
*/
// TODO Electra: Change attestation pool to accomodate pre-electra request
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): Attestation | null {
const fork = this.config.getForkName(slot);
const isPostElectra = isForkPostElectra(fork);
committeeIndex = isPostElectra ? committeeIndex : null;

const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
}

if (isPostElectra) {
assert.true(isElectraAggregate(aggregate), "Aggregate should be type AggregateFastElectra");
} else {
assert.true(!isElectraAggregate(aggregate), "Aggregate should be type AggregateFastPhase0");
}

return fastToAttestation(aggregate);
}

Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Signature} from "@chainsafe/blst";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";
import {AggregateFast, AggregateFastElectra} from "./attestationPool.js";

/**
* Prune a Map indexed by slot to keep the most recent slots, up to `slotsRetained`
Expand Down Expand Up @@ -58,3 +59,7 @@ export function isValidBlsToExecutionChangeForBlockInclusion(

return true;
}

export function isElectraAggregate(aggregate: AggregateFast): aggregate is AggregateFastElectra {
return (aggregate as AggregateFastElectra).committeeBits !== undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@ import {BitArray} from "@chainsafe/ssz";
import {CommitteeIndex, phase0, RootHex, Slot} from "@lodestar/types";
import {MapDef} from "@lodestar/utils";
import {Metrics} from "../../metrics/metrics.js";
import {SeenAttDataKey} from "../../util/sszBytes.js";
import {InsertOutcome} from "../opPools/types.js";

export type SeenAttDataKey = AttDataBase64 | AttDataCommitteeBitsBase64;
// pre-electra, AttestationData is used to cache attestations
type AttDataBase64 = string;
// electra, AttestationData + CommitteeBits are used to cache attestations
type AttDataCommitteeBitsBase64 = string;

export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeValidatorIndices: Uint32Array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {toRootHex} from "@lodestar/utils";
import {IBeaconChain} from "..";
import {AttestationError, AttestationErrorCode, GossipAction} from "../errors/index.js";
import {RegenCaller} from "../regen/index.js";
import {getSeenAttDataKeyFromSignedAggregateAndProof} from "../../util/sszBytes.js";
import {getSelectionProofSignatureSet, getAggregateAndProofSignatureSet} from "./signatureSets/index.js";
import {
getAttestationDataSigningRoot,
getCommitteeIndices,
getSeenAttDataKeyFromSignedAggregateAndProof,
getShufflingForAttestationVerification,
verifyHeadBlockAndTargetRoot,
verifyPropagationSlotRange,
Expand Down Expand Up @@ -71,9 +71,7 @@ async function validateAggregateAndProof(
const attData = aggregate.data;
const attSlot = attData.slot;

const seenAttDataKey = serializedData
? getSeenAttDataKeyFromSignedAggregateAndProof(ForkSeq[fork], serializedData)
: null;
const seenAttDataKey = serializedData ? getSeenAttDataKeyFromSignedAggregateAndProof(fork, serializedData) : null;
const cachedAttData = seenAttDataKey ? chain.seenAttestationDatas.get(attSlot, seenAttDataKey) : null;

let attIndex;
Expand Down
Loading

0 comments on commit b572236

Please sign in to comment.