Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide committeesSize to MatchingDataAttestationGroup #8304

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class AttestationManagerIntegrationTest {

private final AggregatingAttestationPool attestationPool =
new AggregatingAttestationPool(
spec, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT);
spec, recentChainData, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT);
private final MergeTransitionBlockValidator transitionBlockValidator =
new MergeTransitionBlockValidator(spec, recentChainData, ExecutionLayerChannel.NOOP);
private final ForkChoice forkChoice =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.statetransition.attestation;

import it.unimi.dsi.fastutil.ints.Int2IntMap;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -25,6 +26,7 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes;
Expand All @@ -38,9 +40,11 @@
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.attestation.ValidatableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.operations.Attestation;
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.storage.client.RecentChainData;

/**
* Maintains a pool of attestations. Attestations can be retrieved either for inclusion in a block
Expand Down Expand Up @@ -80,13 +84,19 @@ public class AggregatingAttestationPool implements SlotEventsChannel {
private final NavigableMap<UInt64, Set<Bytes>> dataHashBySlot = new TreeMap<>();

private final Spec spec;
private final AtomicInteger size = new AtomicInteger(0);
private final RecentChainData recentChainData;
private final SettableGauge sizeGauge;
private final int maximumAttestationCount;

private final AtomicInteger size = new AtomicInteger(0);

public AggregatingAttestationPool(
final Spec spec, final MetricsSystem metricsSystem, final int maximumAttestationCount) {
final Spec spec,
final RecentChainData recentChainData,
final MetricsSystem metricsSystem,
final int maximumAttestationCount) {
this.spec = spec;
this.recentChainData = recentChainData;
this.sizeGauge =
SettableGauge.create(
metricsSystem,
Expand All @@ -98,7 +108,15 @@ public AggregatingAttestationPool(

public synchronized void add(final ValidatableAttestation attestation) {
final AttestationData attestationData = attestation.getAttestation().getData();
final boolean add = getOrCreateAttestationGroup(attestationData).add(attestation);
final Supplier<Int2IntMap> commiteesSizeSupplier =
attestation
.getCommitteesSize()
.map(committeesSize -> (Supplier<Int2IntMap>) () -> committeesSize)
// fallback to querying the state in the cases when the committeesSize is not available
// in the ValidatableAttestation.
.orElseGet(() -> getCommitteesSizeSupplierUsingTheState(attestationData));
final boolean add =
getOrCreateAttestationGroup(attestationData, commiteesSizeSupplier).add(attestation);
if (add) {
updateSize(1);
}
Expand All @@ -112,14 +130,36 @@ public synchronized void add(final ValidatableAttestation attestation) {
}
}

/**
* @param committeesSizeSupplier Required for aggregating attestations as per <a
* href="https://eips.ethereum.org/EIPS/eip-7549">EIP-7549</a>
*/
private MatchingDataAttestationGroup getOrCreateAttestationGroup(
final AttestationData attestationData) {
final AttestationData attestationData, final Supplier<Int2IntMap> committeesSizeSupplier) {
dataHashBySlot
.computeIfAbsent(attestationData.getSlot(), slot -> new HashSet<>())
.add(attestationData.hashTreeRoot());
return attestationGroupByDataHash.computeIfAbsent(
attestationData.hashTreeRoot(),
key -> new MatchingDataAttestationGroup(spec, attestationData));
key -> new MatchingDataAttestationGroup(spec, attestationData, committeesSizeSupplier));
}

// We only have the committees size already available via attestations received in the gossip
// flow, so querying the state is required when we are tracking attestations coming from a block.
private Supplier<Int2IntMap> getCommitteesSizeSupplierUsingTheState(
final AttestationData attestationData) {
return () -> {
final SlotAndBlockRoot slotAndBlockRoot =
new SlotAndBlockRoot(attestationData.getSlot(), attestationData.getBeaconBlockRoot());
final BeaconState state =
recentChainData
.retrieveStateAtSlot(slotAndBlockRoot)
// the attestation pool flow is entirely synchronous so joining here
.join()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eep here be dragons...

.orElseThrow(
() -> new IllegalStateException("No state available for " + slotAndBlockRoot));
return spec.getBeaconCommitteesSize(state, attestationData.getSlot());
};
}

@Override
Expand Down Expand Up @@ -158,7 +198,9 @@ public synchronized void onAttestationsIncludedInBlock(

private void onAttestationIncludedInBlock(final UInt64 slot, final Attestation attestation) {
final AttestationData attestationData = attestation.getData();
final MatchingDataAttestationGroup attestations = getOrCreateAttestationGroup(attestationData);
final MatchingDataAttestationGroup attestations =
getOrCreateAttestationGroup(
attestationData, getCommitteesSizeSupplierUsingTheState(attestationData));
final int numRemoved = attestations.onAttestationIncludedInBlock(slot, attestation);
updateSize(-numRemoved);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.statetransition.attestation;

import it.unimi.dsi.fastutil.ints.Int2IntMap;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
Expand All @@ -23,6 +24,7 @@
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.tuweni.bytes.Bytes32;
Expand Down Expand Up @@ -72,7 +74,10 @@
/** Precalculated combined list of included validators across all blocks. */
private SszBitlist includedValidators;

public MatchingDataAttestationGroup(final Spec spec, final AttestationData attestationData) {
public MatchingDataAttestationGroup(
final Spec spec,
final AttestationData attestationData,
@SuppressWarnings("unused") final Supplier<Int2IntMap> commiteesSizeSupplier) {

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'commiteesSizeSupplier' is never used.
this.spec = spec;
this.attestationData = attestationData;
this.includedValidators = createEmptyAggregationBits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState;
import tech.pegasys.teku.spec.logic.common.operations.validation.AttestationDataValidator.AttestationInvalidReason;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.storage.client.RecentChainData;

@TestSpecContext(milestone = {PHASE0, ELECTRA})
class AggregatingAttestationPoolTest {
Expand All @@ -61,10 +62,14 @@ class AggregatingAttestationPoolTest {
private AttestationSchema<?> attestationSchema;
private Optional<UInt64> committeeIndex;
private final Spec mockSpec = mock(Spec.class);
private final RecentChainData mockRecentChainData = mock(RecentChainData.class);

private AggregatingAttestationPool aggregatingPool =
new AggregatingAttestationPool(
mockSpec, new NoOpMetricsSystem(), DEFAULT_MAXIMUM_ATTESTATION_COUNT);
mockSpec,
mockRecentChainData,
new NoOpMetricsSystem(),
DEFAULT_MAXIMUM_ATTESTATION_COUNT);

private final AttestationForkChecker forkChecker = mock(AttestationForkChecker.class);

Expand Down Expand Up @@ -385,7 +390,8 @@ public void getSize_shouldDecrementForAllRemovedAttestationsWhileKeepingOthers()

@TestTemplate
void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() {
aggregatingPool = new AggregatingAttestationPool(mockSpec, new NoOpMetricsSystem(), 5);
aggregatingPool =
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5);
final AttestationData attestationData0 = dataStructureUtil.randomAttestationData(ZERO);
final AttestationData attestationData1 = dataStructureUtil.randomAttestationData(ONE);
final AttestationData attestationData2 =
Expand All @@ -409,7 +415,8 @@ void shouldRemoveOldSlotsWhenMaximumNumberOfAttestationsReached() {

@TestTemplate
void shouldNotRemoveLastSlotEvenWhenMaximumNumberOfAttestationsReached() {
aggregatingPool = new AggregatingAttestationPool(mockSpec, new NoOpMetricsSystem(), 5);
aggregatingPool =
new AggregatingAttestationPool(mockSpec, mockRecentChainData, new NoOpMetricsSystem(), 5);
final AttestationData attestationData = dataStructureUtil.randomAttestationData(ZERO);
addAttestationFromValidators(attestationData, 1);
addAttestationFromValidators(attestationData, 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static tech.pegasys.teku.spec.SpecMilestone.PHASE0;
import static tech.pegasys.teku.statetransition.attestation.AggregatorUtil.aggregateAttestations;

import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
import java.util.Optional;
import java.util.function.Supplier;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -52,7 +53,7 @@ public void setUp(final SpecContext specContext) {
attestationSchema = spec.getGenesisSchemaDefinitions().getAttestationSchema();
dataStructureUtil = specContext.getDataStructureUtil();
attestationData = dataStructureUtil.randomAttestationData(SLOT);
group = new MatchingDataAttestationGroup(spec, attestationData);
group = new MatchingDataAttestationGroup(spec, attestationData, Int2IntOpenHashMap::new);
}

@TestTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1153,7 +1153,8 @@ protected void initSlotProcessor() {
public void initAttestationPool() {
LOG.debug("BeaconChainController.initAttestationPool()");
attestationPool =
new AggregatingAttestationPool(spec, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT);
new AggregatingAttestationPool(
spec, recentChainData, metricsSystem, DEFAULT_MAXIMUM_ATTESTATION_COUNT);
eventChannels.subscribe(SlotEventsChannel.class, attestationPool);
blockImporter.subscribeToVerifiedBlockAttestations(
attestationPool::onAttestationsIncludedInBlock);
Expand Down