Skip to content

Commit

Permalink
Mark local-el retrieved blobs as seen WRT gossip equivocation cache (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Oct 4, 2024
1 parent 7d40ea4 commit 234f711
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.exception.ExceptionUtils;
Expand Down Expand Up @@ -63,6 +64,7 @@
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackersPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHistoricalSlot
Expand Down Expand Up @@ -98,6 +100,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
private final AsyncRunner asyncRunner;
private final RecentChainData recentChainData;
private final ExecutionLayerChannel executionLayer;
private final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier;
private final Consumer<BlobSidecar> blobSidecarGossipPublisher;
private final int maxTrackers;

Expand Down Expand Up @@ -129,6 +132,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
Expand All @@ -140,6 +144,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.asyncRunner = asyncRunner;
this.recentChainData = recentChainData;
this.executionLayer = executionLayer;
this.gossipValidatorSupplier = gossipValidatorSupplier;
this.blobSidecarGossipPublisher = blobSidecarGossipPublisher;
this.maxTrackers = maxTrackers;
this.sizeGauge = sizeGauge;
Expand All @@ -159,6 +164,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalSlotTolerance,
final UInt64 futureSlotTolerance,
Expand All @@ -171,6 +177,7 @@ public class BlockBlobSidecarsTrackersPoolImpl extends AbstractIgnoringFutureHis
this.asyncRunner = asyncRunner;
this.recentChainData = recentChainData;
this.executionLayer = executionLayer;
this.gossipValidatorSupplier = gossipValidatorSupplier;
this.blobSidecarGossipPublisher = blobSidecarGossipPublisher;
this.maxTrackers = maxTrackers;
this.sizeGauge = sizeGauge;
Expand Down Expand Up @@ -224,8 +231,8 @@ public synchronized void onNewBlobSidecar(
sizeGauge.set(++totalBlobSidecars, GAUGE_BLOB_SIDECARS_LABEL);
countBlobSidecar(remoteOrigin);
newBlobSidecarSubscribers.deliver(NewBlobSidecarSubscriber::onNewBlobSidecar, blobSidecar);
if (remoteOrigin.equals(LOCAL_EL)) {
blobSidecarGossipPublisher.accept(blobSidecar);
if (remoteOrigin.equals(LOCAL_EL) && slotAndBlockRoot.getSlot().equals(getCurrentSlot())) {
publishRecoveredBlobSidecar(blobSidecar);
}
} else {
countDuplicateBlobSidecar(remoteOrigin);
Expand All @@ -236,6 +243,12 @@ public synchronized void onNewBlobSidecar(
}
}

private void publishRecoveredBlobSidecar(final BlobSidecar blobSidecar) {
LOG.debug("Publishing recovered blob sidecar {}", blobSidecar::toLogString);
gossipValidatorSupplier.get().markForEquivocation(blobSidecar);
blobSidecarGossipPublisher.accept(blobSidecar);
}

private void countBlobSidecar(final RemoteOrigin origin) {
switch (origin) {
case RPC -> poolStatsCounters.labels(COUNTER_SIDECAR_TYPE, COUNTER_RPC_SUBTYPE).inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.hyperledger.besu.plugin.services.MetricsSystem;
import org.hyperledger.besu.plugin.services.metrics.Counter;
import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
Expand All @@ -31,6 +32,7 @@
import tech.pegasys.teku.spec.executionlayer.ExecutionLayerChannel;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTrackerFactory;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class PoolFactory {
Expand Down Expand Up @@ -117,6 +119,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher) {
return createPoolForBlockBlobSidecarsTrackers(
blockImportChannel,
Expand All @@ -125,6 +128,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
DEFAULT_HISTORICAL_SLOT_TOLERANCE,
FutureItems.DEFAULT_FUTURE_SLOT_TOLERANCE,
Expand All @@ -138,6 +142,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
Expand All @@ -151,6 +156,7 @@ public BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
historicalBlockTolerance,
futureBlockTolerance,
Expand All @@ -165,6 +171,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
final AsyncRunner asyncRunner,
final RecentChainData recentChainData,
final ExecutionLayerChannel executionLayer,
final Supplier<BlobSidecarGossipValidator> gossipValidatorSupplier,
final Consumer<BlobSidecar> blobSidecarGossipPublisher,
final UInt64 historicalBlockTolerance,
final UInt64 futureBlockTolerance,
Expand All @@ -179,6 +186,7 @@ BlockBlobSidecarsTrackersPoolImpl createPoolForBlockBlobSidecarsTrackers(
asyncRunner,
recentChainData,
executionLayer,
gossipValidatorSupplier,
blobSidecarGossipPublisher,
historicalBlockTolerance,
futureBlockTolerance,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,7 @@ public SafeFuture<InternalValidationResult> validate(final BlobSidecar blobSidec
* [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index)
* with valid header signature, sidecar inclusion proof, and kzg proof.
*/
if (!receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(),
blockHeader.getProposerIndex(),
blobSidecar.getIndex()))) {
if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) {
return ignore(
"BlobSidecar is not the first valid for its slot and index. It will be dropped.");
}
Expand All @@ -277,6 +273,17 @@ public SafeFuture<InternalValidationResult> validate(final BlobSidecar blobSidec
});
}

private boolean markForEquivocation(final BeaconBlockHeader blockHeader, final UInt64 index) {
return receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(), blockHeader.getProposerIndex(), index));
}

public boolean markForEquivocation(final BlobSidecar blobSidecar) {
return markForEquivocation(
blobSidecar.getSignedBeaconBlockHeader().getMessage(), blobSidecar.getIndex());
}

private SafeFuture<InternalValidationResult> validateBlobSidecarWithKnownValidHeader(
final BlobSidecar blobSidecar, final BeaconBlockHeader blockHeader) {

Expand Down Expand Up @@ -310,9 +317,7 @@ private SafeFuture<InternalValidationResult> validateBlobSidecarWithKnownValidHe
* [IGNORE] The sidecar is the first sidecar for the tuple (block_header.slot, block_header.proposer_index, blob_sidecar.index)
* with valid header signature, sidecar inclusion proof, and kzg proof.
*/
if (!receivedValidBlobSidecarInfoSet.add(
new SlotProposerIndexAndBlobIndex(
blockHeader.getSlot(), blockHeader.getProposerIndex(), blobSidecar.getIndex()))) {
if (!markForEquivocation(blockHeader, blobSidecar.getIndex())) {
return SafeFuture.completedFuture(
ignore("BlobSidecar is not the first valid for its slot and index. It will be dropped."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import tech.pegasys.teku.statetransition.blobs.BlobSidecarManager.RemoteOrigin;
import tech.pegasys.teku.statetransition.blobs.BlockBlobSidecarsTracker;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.statetransition.validation.BlobSidecarGossipValidator;
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlockBlobSidecarsTrackersPoolImplTest {
Expand All @@ -82,6 +83,9 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
@SuppressWarnings("unchecked")
private final Consumer<BlobSidecar> blobSidecarPublisher = mock(Consumer.class);

private final BlobSidecarGossipValidator blobSidecarGossipValidator =
mock(BlobSidecarGossipValidator.class);

private final BlockImportChannel blockImportChannel = mock(BlockImportChannel.class);
private final int maxItems = 15;
private final BlockBlobSidecarsTrackersPoolImpl blockBlobSidecarsTrackersPool =
Expand All @@ -93,6 +97,7 @@ public class BlockBlobSidecarsTrackersPoolImplTest {
asyncRunner,
recentChainData,
executionLayer,
() -> blobSidecarGossipValidator,
blobSidecarPublisher,
historicalTolerance,
futureTolerance,
Expand Down Expand Up @@ -203,7 +208,7 @@ public void onNewBlobSidecar_shouldIgnoreDuplicates() {
}

@Test
public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() {
public void onNewBlobSidecar_shouldMarkForEquivocationAndPublishWhenOriginIsLocalEL() {
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
Expand All @@ -220,16 +225,58 @@ public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalEL() {
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(true);

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);
blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar2, RemoteOrigin.GOSSIP);
blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar3, RemoteOrigin.RPC);

assertBlobSidecarsCount(3);
assertBlobSidecarsTrackersCount(3);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
}

@Test
public void onNewBlobSidecar_shouldPublishWhenOriginIsLocalELAndEquivocating() {
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false);

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);

assertBlobSidecarsCount(1);
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, times(1)).accept(blobSidecar1);
}

@Test
public void onNewBlobSidecar_shouldNotPublishWhenOriginIsLocalELIsNotCurrentSlot() {
final BlobSidecar blobSidecar1 =
dataStructureUtil
.createRandomBlobSidecarBuilder()
.signedBeaconBlockHeader(dataStructureUtil.randomSignedBeaconBlockHeader(currentSlot))
.build();

when(blobSidecarGossipValidator.markForEquivocation(blobSidecar1)).thenReturn(false);
blockBlobSidecarsTrackersPool.onSlot(currentSlot.plus(1));

blockBlobSidecarsTrackersPool.onNewBlobSidecar(blobSidecar1, RemoteOrigin.LOCAL_EL);

assertBlobSidecarsCount(1);
assertBlobSidecarsTrackersCount(1);

verify(blobSidecarGossipValidator, never()).markForEquivocation(blobSidecar1);
verify(blobSidecarPublisher, never()).accept(blobSidecar1);
}

@Test
public void onNewBlock_shouldIgnorePreDenebBlocks() {
final Spec spec = TestSpecFactory.createMainnetCapella();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package tech.pegasys.teku.statetransition.validation;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
Expand Down Expand Up @@ -277,6 +278,16 @@ void shouldTrackValidInfoSet() {
.isCompletedWithValueMatching(InternalValidationResult::isIgnore);
}

@TestTemplate
void shouldMarkForEquivocation() {
assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isTrue();

assertThat(blobSidecarValidator.markForEquivocation(blobSidecar)).isFalse();

SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar))
.isCompletedWithValueMatching(InternalValidationResult::isIgnore);
}

@TestTemplate
void shouldIgnoreImmediatelyWhenBlobFromValidInfoSet() {
SafeFutureAssert.assertThatSafeFuture(blobSidecarValidator.validate(blobSidecar))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ public class BeaconChainController extends Service implements BeaconChainControl
protected volatile GossipValidationHelper gossipValidationHelper;
protected volatile KZG kzg;
protected volatile BlobSidecarManager blobSidecarManager;
protected volatile BlobSidecarGossipValidator blobSidecarValidator;
protected volatile Optional<TerminalPowBlockMonitor> terminalPowBlockMonitor = Optional.empty();
protected volatile ProposersDataManager proposersDataManager;
protected volatile KeyValueStore<String, Bytes> keyValueStore;
Expand Down Expand Up @@ -568,7 +569,7 @@ protected void initBlobSidecarManager() {
LimitedMap.createSynchronizedLRU(500);
final MiscHelpersDeneb miscHelpers =
MiscHelpersDeneb.required(spec.forMilestone(SpecMilestone.DENEB).miscHelpers());
final BlobSidecarGossipValidator blobSidecarValidator =
blobSidecarValidator =
BlobSidecarGossipValidator.create(
spec, invalidBlockRoots, gossipValidationHelper, miscHelpers, kzg);
final BlobSidecarManagerImpl blobSidecarManagerImpl =
Expand Down Expand Up @@ -626,6 +627,7 @@ protected void initBlockBlobSidecarsTrackersPool() {
beaconAsyncRunner,
recentChainData,
executionLayer,
() -> blobSidecarValidator,
blobSidecarGossipChannel::publishBlobSidecar);
eventChannels.subscribe(FinalizedCheckpointChannel.class, pool);
blockBlobSidecarsTrackersPool = pool;
Expand Down

0 comments on commit 234f711

Please sign in to comment.