Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
ensi321 committed Aug 5, 2024
1 parent 9c0d54e commit 97b089a
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 29 deletions.
14 changes: 11 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {Epoch, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, ForkSeq, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand All @@ -26,7 +26,9 @@ export function getBeaconPoolApi({
return {
async getPoolAttestations({slot, committeeIndex}) {
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
let attestations = chain.aggregatedAttestationPool
.getAll(slot)
.filter((attestation) => !isElectraAttestation(attestation));

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand All @@ -39,6 +41,12 @@ export function getBeaconPoolApi({
// Already filtered by slot
let attestations = chain.aggregatedAttestationPool.getAll(slot);
const fork = chain.config.getForkName(slot ?? attestations[0]?.data.slot ?? chain.clock.currentSlot);
const isAfterElectra = ForkSeq[fork] >= ForkSeq.electra;
attestations = attestations.filter(
(attestation) =>
(isAfterElectra && isElectraAttestation(attestation)) ||
(!isAfterElectra && !isElectraAttestation(attestation))
);

if (committeeIndex !== undefined) {
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
Expand Down
19 changes: 17 additions & 2 deletions packages/beacon-node/src/api/impl/validator/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1068,8 +1068,23 @@ export function getValidatorApi(
},

// TODO Electra: Implement getAggregatedAttestation to properly handle pre-electra
async getAggregatedAttestation() {
throw new Error("Not implemented. Use getAggregatedAttestationV2 for now.");
async getAggregatedAttestation({attestationDataRoot, slot}) {
notWhileSyncing();

await waitForSlot(slot); // Must never request for a future slot > currentSlot

const dataRootHex = toHex(attestationDataRoot);
const aggregate = chain.attestationPool.getAggregate(slot, null, dataRootHex);

if (!aggregate) {
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
}

metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);

return {
data: aggregate,
};
},

async getAggregatedAttestationV2({attestationDataRoot, slot, committeeIndex}) {
Expand Down
10 changes: 8 additions & 2 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ export class BeaconChain implements IBeaconChain {

// Ops pool
readonly attestationPool: AttestationPool;
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
readonly aggregatedAttestationPool: AggregatedAttestationPool;
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
readonly opPool = new OpPool();
Expand Down Expand Up @@ -226,7 +226,13 @@ export class BeaconChain implements IBeaconChain {
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});

const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
this.attestationPool = new AttestationPool(
this.config,
clock,
preAggregateCutOffTime,
this.opts?.preaggregateSlotDistance
);
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config);
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
clock,
preAggregateCutOffTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from "@lodestar/state-transition";
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
import {toHex, MapDef, assert} from "@lodestar/utils";
import {ChainForkConfig} from "@lodestar/config";
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {InsertOutcome} from "./types.js";
Expand Down Expand Up @@ -101,6 +102,8 @@ export class AggregatedAttestationPool {
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
private lowestPermissibleSlot = 0;

constructor(private readonly config: ChainForkConfig) {}

/** For metrics to track size of the pool */
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
let attestationCount = 0;
Expand All @@ -123,6 +126,7 @@ export class AggregatedAttestationPool {
committee: Uint32Array
): InsertOutcome {
const slot = attestation.data.slot;
const fork = this.config.getForkSeq(slot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject any attestations that are too old.
Expand All @@ -136,10 +140,22 @@ export class AggregatedAttestationPool {
attestationGroupByIndex = new Map<CommitteeIndex, MatchingDataAttestationGroup>();
attestationGroupByIndexByDataHash.set(dataRootHex, attestationGroupByIndex);
}
const committeeIndex = isElectraAttestation(attestation)
? // this attestation is added to pool after validation
attestation.committeeBits.getSingleTrueBit()
: attestation.data.index;

let committeeIndex;

if (fork >= ForkSeq.electra) {
if (isElectraAttestation(attestation)) {
committeeIndex = attestation.committeeBits.getSingleTrueBit();
} else {
throw new Error("");
}
} else {
if (!isElectraAttestation(attestation)) {
committeeIndex = attestation.data.index;
} else {
throw new Error("");
}
}
// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
Expand Down Expand Up @@ -390,6 +406,10 @@ export class AggregatedAttestationPool {

/**
* Get all attestations optionally filtered by `attestation.data.slot`
* Note this function is not fork aware and can potentially return a mix
* of phase0.Attestations and electra.Attestations.
* Caller of this function is expected to filtered result if they desire
* a homogenous array.
* @param bySlot slot to filter, `bySlot === attestation.data.slot`
*/
getAll(bySlot?: Slot): Attestation[] {
Expand Down Expand Up @@ -505,7 +525,16 @@ export class MatchingDataAttestationGroup {
getAttestationsForBlock(fork: ForkName, notSeenAttestingIndices: Set<number>): AttestationNonParticipant[] {
const attestations: AttestationNonParticipant[] = [];
const forkSeq = ForkSeq[fork];
const isAfterElectra = forkSeq >= ForkSeq.electra;
for (const {attestation} of this.attestations) {
if (
(isAfterElectra && !isElectraAttestation(attestation)) ||
(!isAfterElectra && isElectraAttestation(attestation))
) {
// TODO Electra: log warning
continue;
}

let notSeenAttesterCount = 0;
const {aggregationBits} = attestation;
for (const notSeenIndex of notSeenAttestingIndices) {
Expand All @@ -515,12 +544,12 @@ export class MatchingDataAttestationGroup {
}

// if fork >= electra, should return electra-only attestations
if (notSeenAttesterCount > 0 && (forkSeq < ForkSeq.electra || isElectraAttestation(attestation))) {
if (notSeenAttesterCount > 0) {
attestations.push({attestation, notSeenAttesterCount});
}
}

const maxAttestation = forkSeq >= ForkSeq.electra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
const maxAttestation = isAfterElectra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
if (attestations.length <= maxAttestation) {
return attestations;
} else {
Expand Down
34 changes: 27 additions & 7 deletions packages/beacon-node/src/chain/opPools/attestationPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import {BitArray} from "@chainsafe/ssz";
import {Signature, aggregateSignatures} from "@chainsafe/blst";
import {Slot, RootHex, isElectraAttestation, Attestation} from "@lodestar/types";
import {MapDef, assert} from "@lodestar/utils";
import {ForkSeq} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {IClock} from "../../util/clock.js";
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
import {isElectraAggregate, pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";

/**
* The number of slots that will be stored in the pool.
Expand All @@ -28,14 +30,14 @@ type AggregateFastPhase0 = {
signature: Signature;
};

type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};
export type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};

type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
export type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;

/** Hex string of DataRoot `TODO` */
type DataRootHex = string;

type CommitteeIndex = number;
type CommitteeIndex = number | null;

/**
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
Expand Down Expand Up @@ -68,6 +70,7 @@ export class AttestationPool {
private lowestPermissibleSlot = 0;

constructor(
private readonly config: ChainForkConfig,
private readonly clock: IClock,
private readonly cutOffSecFromSlot: number,
private readonly preaggregateSlotDistance = 0
Expand Down Expand Up @@ -103,6 +106,7 @@ export class AttestationPool {
*/
add(committeeIndex: CommitteeIndex, attestation: Attestation, attDataRootHex: RootHex): InsertOutcome {
const slot = attestation.data.slot;
const fork = this.config.getForkSeq(slot);
const lowestPermissibleSlot = this.lowestPermissibleSlot;

// Reject any attestations that are too old.
Expand All @@ -121,8 +125,15 @@ export class AttestationPool {
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
}

// this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");
// TODO Electra: Use `isForkElectra` after the other PR is merged
if (fork >= ForkSeq.electra) {
// Electra only: this should not happen because attestation should be validated before reaching this
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool post-electra");
assert.true(isElectraAttestation(attestation), "Attestation should be type electra.Attestation");
} else {
assert.true(!isElectraAttestation(attestation), "Attestation should be type phase0.Attestation");
committeeIndex = null; // For pre-electra, committee index info is encoded in attDataRootIndex
}

// Pre-aggregate the contribution with existing items
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
Expand All @@ -144,14 +155,23 @@ export class AttestationPool {
/**
* For validator API to get an aggregate
*/
// TODO Electra: Change attestation pool to accomodate pre-electra request
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): Attestation | null {
const fork = this.config.getForkSeq(slot);
const isAfterElectra = fork >= ForkSeq.electra;
committeeIndex = isAfterElectra ? committeeIndex : null;

const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
if (!aggregate) {
// TODO: Add metric for missing aggregates
return null;
}

if (isAfterElectra) {
assert.true(isElectraAggregate(aggregate), "Aggregate should be type AggregateFastElectra");
} else {
assert.true(!isElectraAggregate(aggregate), "Aggregate should be type AggregateFastPhase0");
}

return fastToAttestation(aggregate);
}

Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/chain/opPools/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {Signature} from "@chainsafe/blst";
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {Slot, capella} from "@lodestar/types";
import {AggregateFast, AggregateFastElectra} from "./attestationPool.js";

/**
* Prune a Map indexed by slot to keep the most recent slots, up to `slotsRetained`
Expand Down Expand Up @@ -58,3 +59,7 @@ export function isValidBlsToExecutionChangeForBlockInclusion(

return true;
}

export function isElectraAggregate(aggregate: AggregateFast): aggregate is AggregateFastElectra {
return (aggregate as AggregateFastElectra).committeeBits !== undefined;
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import {mapValues} from "@lodestar/utils";
import {ForkSeq} from "@lodestar/params";
import {GossipType} from "../../gossip/interface.js";
import {PendingGossipsubMessage} from "../types.js";
import {getSeenAttDataKey} from "../../../util/sszBytes.js";
import {getGossipAttestationIndex} from "../../../util/sszBytes.js";
import {LinearGossipQueue} from "./linear.js";
import {
DropType,
Expand Down Expand Up @@ -88,8 +87,8 @@ const indexedGossipQueueOpts: {
[GossipType.beacon_attestation]: {
maxLength: 24576,
indexFn: (item: PendingGossipsubMessage) => {
const {topic, msg} = item;
return getSeenAttDataKey(ForkSeq[topic.fork], msg.data);
// Note indexFn is fork agnostic despite changes introduced in Electra
return getGossipAttestationIndex(item.msg.data);
},
minChunkSize: MIN_SIGNATURE_SETS_TO_BATCH_VERIFY,
maxChunkSize: MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
Expand Down
7 changes: 7 additions & 0 deletions packages/beacon-node/src/util/sszBytes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ export function getSeenAttDataKeyPhase0(data: Uint8Array): AttDataBase64 | null
return toBase64(data.slice(VARIABLE_FIELD_OFFSET, VARIABLE_FIELD_OFFSET + ATTESTATION_DATA_SIZE));
}

/**
* Alias of `getSeenAttDataKeyPhase0` specifically for batch handling indexing in gossip queue
*/
export function getGossipAttestationIndex(data: Uint8Array): AttDataBase64 | null {
return getSeenAttDataKeyPhase0(data);
}

/**
* Extract aggregation bits from attestation serialized bytes.
* Return null if data is not long enough to extract aggregation bits.
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/test/mocks/mockedBeaconChain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
// @ts-expect-error
eth1: new Eth1ForBlockProduction(),
opPool: new OpPool(),
aggregatedAttestationPool: new AggregatedAttestationPool(),
aggregatedAttestationPool: new AggregatedAttestationPool(config),
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
beaconProposerCache: new BeaconProposerCache(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import {
import {HISTORICAL_ROOTS_LIMIT, SLOTS_PER_EPOCH} from "@lodestar/params";
import {ExecutionStatus, ForkChoice, IForkChoiceStore, ProtoArray, DataAvailabilityStatus} from "@lodestar/fork-choice";
import {ssz} from "@lodestar/types";
// eslint-disable-next-line import/no-relative-packages
import {generatePerfTestCachedStateAltair} from "../../../../../state-transition/test/perf/util.js";

import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {generatePerfTestCachedStateAltair} from "@lodestar/state-transition/test/perf/util.js";
import {AggregatedAttestationPool} from "../../../../src/chain/opPools/aggregatedAttestationPool.js";
import {computeAnchorCheckpoint} from "../../../../src/chain/initState.js";

Expand Down Expand Up @@ -230,7 +231,10 @@ function getAggregatedAttestationPool(
numMissedVotes: number,
numBadVotes: number
): AggregatedAttestationPool {
const pool = new AggregatedAttestationPool();
const config = createChainForkConfig({
...defaultChainConfig,
});
const pool = new AggregatedAttestationPool(config);
for (let epochSlot = 0; epochSlot < SLOTS_PER_EPOCH; epochSlot++) {
const slot = state.slot - 1 - epochSlot;
const epoch = computeEpochAtSlot(slot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "@lodestar/params";
import {ssz, phase0} from "@lodestar/types";
import {CachedBeaconStateAltair} from "@lodestar/state-transition/src/types.js";
import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
import {MockedForkChoice, getMockedForkChoice} from "../../../mocks/mockedBeaconChain.js";
import {
aggregateConsolidation,
Expand All @@ -36,6 +37,9 @@ const validSignature = fromHexString(
describe("AggregatedAttestationPool", function () {
let pool: AggregatedAttestationPool;
const fork = ForkName.altair;
const config = createChainForkConfig({
...defaultChainConfig,
});
const altairForkEpoch = 2020;
const currentEpoch = altairForkEpoch + 10;
const currentSlot = SLOTS_PER_EPOCH * currentEpoch;
Expand Down Expand Up @@ -79,7 +83,7 @@ describe("AggregatedAttestationPool", function () {
let forkchoiceStub: MockedForkChoice;

beforeEach(() => {
pool = new AggregatedAttestationPool();
pool = new AggregatedAttestationPool(config);
altairState = originalState.clone();
forkchoiceStub = getMockedForkChoice();
});
Expand Down

0 comments on commit 97b089a

Please sign in to comment.