Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlobSidecars pruning #7085

Merged
merged 7 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.execution.SlotAndExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
Expand Down Expand Up @@ -113,9 +112,6 @@ default SafeFuture<Optional<BeaconBlock>> retrieveBlock(Bytes32 blockRoot) {

SafeFuture<Optional<SignedBeaconBlock>> retrieveSignedBlock(Bytes32 blockRoot);

SafeFuture<Optional<SignedBeaconBlockAndBlobsSidecar>> retrieveSignedBlockAndBlobsSidecar(
Bytes32 blockRoot);

SafeFuture<Optional<SignedBlockAndState>> retrieveBlockAndState(Bytes32 blockRoot);

SafeFuture<Optional<StateAndBlockSummary>> retrieveStateAndBlockSummary(Bytes32 blockRoot);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.blockbody.versions.deneb.SignedBeaconBlockAndBlobsSidecar;
import tech.pegasys.teku.spec.datastructures.execution.ExecutionPayload;
import tech.pegasys.teku.spec.datastructures.execution.SlotAndExecutionPayloadSummary;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
Expand Down Expand Up @@ -196,12 +195,6 @@ public SafeFuture<Optional<SignedBeaconBlock>> retrieveSignedBlock(Bytes32 block
return SafeFuture.completedFuture(getBlockIfAvailable(blockRoot));
}

@Override
public SafeFuture<Optional<SignedBeaconBlockAndBlobsSidecar>> retrieveSignedBlockAndBlobsSidecar(
Bytes32 blockRoot) {
return SafeFuture.failedFuture(new UnsupportedOperationException("Not yet implemented"));
}

@Override
public SafeFuture<Optional<SignedBlockAndState>> retrieveBlockAndState(Bytes32 blockRoot) {
return SafeFuture.completedFuture(getBlockAndState(blockRoot));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2086,6 +2086,11 @@ public List<BlobSidecar> randomBlobSidecars(int count) {
return blobSidecars;
}

public BlobSidecar randomBlobSidecar(
final UInt64 slot, final Bytes32 blockRoot, final UInt64 index) {
return new RandomBlobSidecarBuilder().slot(slot).index(index).blockRoot(blockRoot).build();
}

public BlobSidecar randomBlobSidecar(final Bytes32 blockRoot, final UInt64 index) {
return new RandomBlobSidecarBuilder().index(index).blockRoot(blockRoot).build();
}
Expand Down Expand Up @@ -2363,25 +2368,6 @@ private SchemaDefinitionsDeneb getSchemaDefinitionsDeneb() {
}
}

public SignedBeaconBlockAndBlobsSidecar randomConsistentSignedBeaconBlockAndBlobsSidecar() {
return randomConsistentSignedBeaconBlockAndBlobsSidecar(randomUInt64());
}

public SignedBeaconBlockAndBlobsSidecar randomConsistentSignedBeaconBlockAndBlobsSidecar(
final UInt64 slot) {
final SignedBeaconBlock randomBlock = randomSignedBeaconBlock(slot);
return SchemaDefinitionsDeneb.required(spec.atSlot(slot).getSchemaDefinitions())
.getSignedBeaconBlockAndBlobsSidecarSchema()
.create(
randomBlock,
randomBlobsSidecar(
randomBlock.getRoot(),
randomBlock.getSlot(),
BeaconBlockBodyDeneb.required(randomBlock.getBeaconBlock().orElseThrow().getBody())
.getBlobKzgCommitments()
.size()));
}

public SszList<SszKZGCommitment> randomSszKzgCommitmentList() {
final int count = randomInt(spec.getMaxBlobsPerBlock().orElseThrow()) + 1;
return randomSszKzgCommitmentList(count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand Down Expand Up @@ -47,18 +46,12 @@ public boolean isAvailabilityRequiredAtSlot(final UInt64 slot) {
return false;
}

@Override
public void storeUnconfirmedValidatedBlobsSidecar(final BlobsSidecar blobsSidecar) {}

@Override
public void storeNoBlobsSlot(SlotAndBlockRoot slotAndBlockRoot) {}

@Override
public void storeBlobSidecar(final BlobSidecar blobSidecar) {}

@Override
public void storeUnconfirmedBlobsSidecar(final BlobsSidecar blobsSidecar) {}

@Override
public void discardBlobSidecarsByBlock(final SignedBeaconBlock block) {}

Expand All @@ -78,14 +71,10 @@ SafeFuture<InternalValidationResult> validateAndImportBlobSidecar(

boolean isAvailabilityRequiredAtSlot(UInt64 slot);

void storeUnconfirmedValidatedBlobsSidecar(BlobsSidecar blobsSidecar);

void storeNoBlobsSlot(SlotAndBlockRoot slotAndBlockRoot);

void storeBlobSidecar(BlobSidecar blobSidecar);

void storeUnconfirmedBlobsSidecar(BlobsSidecar blobsSidecar);

void discardBlobSidecarsByBlock(SignedBeaconBlock block);

BlobSidecarsAvailabilityChecker createAvailabilityChecker(SignedBeaconBlock block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.tuweni.bytes.Bytes32;
import tech.pegasys.teku.ethereum.events.SlotEventsChannel;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.collections.LimitedMap;
import tech.pegasys.teku.infrastructure.subscribers.Subscribers;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.Spec;
Expand All @@ -45,7 +44,6 @@
import tech.pegasys.teku.storage.client.RecentChainData;

public class BlobSidecarManagerImpl implements BlobSidecarManager, SlotEventsChannel {
private static final int MAX_CACHED_VALIDATED_BLOBS_SIDECARS_PER_SLOT = 10;
private static final Logger LOG = LogManager.getLogger();

private final Spec spec;
Expand Down Expand Up @@ -116,16 +114,6 @@ public boolean isAvailabilityRequiredAtSlot(final UInt64 slot) {
return spec.isAvailabilityOfBlobSidecarsRequiredAtSlot(recentChainData.getStore(), slot);
}

@Override
public void storeUnconfirmedValidatedBlobsSidecar(final BlobsSidecar blobsSidecar) {
// cache already validated blobs
validatedPendingBlobs
.computeIfAbsent(blobsSidecar.getBeaconBlockSlot(), __ -> createNewMap())
.put(blobsSidecar.getBeaconBlockRoot(), blobsSidecar);

internalStoreUnconfirmedBlobsSidecar(blobsSidecar);
}

@Override
public void storeNoBlobsSlot(final SlotAndBlockRoot slotAndBlockRoot) {
storageUpdateChannel
Expand All @@ -150,11 +138,6 @@ public void storeBlobSidecar(final BlobSidecar blobSidecar) {
.ifExceptionGetsHereRaiseABug();
}

@Override
public void storeUnconfirmedBlobsSidecar(final BlobsSidecar blobsSidecar) {
internalStoreUnconfirmedBlobsSidecar(blobsSidecar);
}

@Override
public void discardBlobSidecarsByBlock(final SignedBeaconBlock block) {
storageUpdateChannel
Expand All @@ -179,23 +162,6 @@ public void onSlot(final UInt64 slot) {
validatedPendingBlobs.headMap(slot.minusMinZero(1)).clear();
}

private void internalStoreUnconfirmedBlobsSidecar(final BlobsSidecar blobsSidecar) {
storageUpdateChannel
.onBlobsSidecar(blobsSidecar)
.thenRun(
() ->
LOG.debug(
"Unconfirmed BlobsSidecar stored for {}",
() ->
new SlotAndBlockRoot(
blobsSidecar.getBeaconBlockSlot(), blobsSidecar.getBeaconBlockRoot())))
.ifExceptionGetsHereRaiseABug();
}

private Map<Bytes32, BlobsSidecar> createNewMap() {
return LimitedMap.createSynchronized(MAX_CACHED_VALIDATED_BLOBS_SIDECARS_PER_SLOT);
}

@VisibleForTesting
Map<Bytes32, BlobsSidecar> getValidatedPendingBlobsForSlot(final UInt64 slot) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

package tech.pegasys.teku.statetransition.blobs;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -28,7 +27,8 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.logic.common.helpers.MiscHelpers;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.statetransition.validation.BlobSidecarValidator;
Expand Down Expand Up @@ -57,53 +57,25 @@ public class BlobSidecarManagerTest {

@BeforeEach
void setUp() {
when(storageUpdateChannel.onBlobsSidecar(any())).thenReturn(SafeFuture.COMPLETE);
when(storageUpdateChannel.onBlobSidecar(any())).thenReturn(SafeFuture.COMPLETE);
when(storageUpdateChannel.onNoBlobsSlot(any())).thenReturn(SafeFuture.COMPLETE);
when(mockedSpec.atSlot(any())).thenReturn(mockedSpecVersion);
when(mockedSpecVersion.miscHelpers()).thenReturn(mockedMiscHelpers);
when(mockedSpec.getForkSchedule()).thenReturn(mockedForkSchedule);
}

@Test
void shouldStoreUnconfirmedValidatedBlobsSidecar() {
final BlobsSidecar blobsSidecar = dataStructureUtil.randomBlobsSidecar();
blobSidecarManager.storeUnconfirmedValidatedBlobsSidecar(blobsSidecar);

verify(storageUpdateChannel).onBlobsSidecar(blobsSidecar);
}

@Test
void shouldStoreUnconfirmedBlobsSidecar() {
final BlobsSidecar blobsSidecar = dataStructureUtil.randomBlobsSidecar();
blobSidecarManager.storeUnconfirmedBlobsSidecar(blobsSidecar);

verify(storageUpdateChannel).onBlobsSidecar(blobsSidecar);
void shouldStoreBlobSidecar() {
final BlobSidecar blobSidecar = dataStructureUtil.randomBlobSidecar();
blobSidecarManager.storeBlobSidecar(blobSidecar);
verify(storageUpdateChannel).onBlobSidecar(blobSidecar);
}

@Test
void shouldDiscardCachedValidatedBlobsOnSlot() {
final Bytes32 blockRoot = dataStructureUtil.randomBytes32();

final BlobsSidecar blobs1 = dataStructureUtil.randomBlobsSidecar(blockRoot, UInt64.ONE);
final BlobsSidecar blobs2 = dataStructureUtil.randomBlobsSidecar(blockRoot, UInt64.valueOf(2));

blobSidecarManager.storeUnconfirmedValidatedBlobsSidecar(blobs1);
blobSidecarManager.storeUnconfirmedValidatedBlobsSidecar(blobs2);

assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.ONE))
.containsEntry(blockRoot, blobs1);
assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.valueOf(2)))
.containsEntry(blockRoot, blobs2);

blobSidecarManager.onSlot(UInt64.valueOf(2));

assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.ONE))
.containsEntry(blockRoot, blobs1);
assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.valueOf(2)))
.containsEntry(blockRoot, blobs2);

blobSidecarManager.onSlot(UInt64.valueOf(4));

assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.ONE)).isEmpty();
assertThat(blobSidecarManager.getValidatedPendingBlobsForSlot(UInt64.valueOf(2))).isEmpty();
void shouldStoreNoBlobsSlot() {
final SlotAndBlockRoot noBlobsSlotAndRoot =
new SlotAndBlockRoot(UInt64.valueOf(3), Bytes32.ZERO);
blobSidecarManager.storeNoBlobsSlot(noBlobsSlotAndRoot);
verify(storageUpdateChannel).onNoBlobsSlot(noBlobsSlotAndRoot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.storage.api.CombinedStorageChannel;
import tech.pegasys.teku.storage.api.Eth1DepositStorageChannel;
import tech.pegasys.teku.storage.api.FinalizedCheckpointChannel;
import tech.pegasys.teku.storage.api.VoteUpdateChannel;
import tech.pegasys.teku.storage.server.BatchingVoteUpdateChannel;
import tech.pegasys.teku.storage.server.ChainStorage;
Expand All @@ -37,7 +36,7 @@
import tech.pegasys.teku.storage.server.RetryingStorageUpdateChannel;
import tech.pegasys.teku.storage.server.StorageConfiguration;
import tech.pegasys.teku.storage.server.VersionedDatabaseFactory;
import tech.pegasys.teku.storage.server.pruner.BlobsSidecarPruner;
import tech.pegasys.teku.storage.server.pruner.BlobSidecarPruner;
import tech.pegasys.teku.storage.server.pruner.BlockPruner;

public class StorageService extends Service implements StorageServiceFacade {
Expand All @@ -47,7 +46,7 @@ public class StorageService extends Service implements StorageServiceFacade {
private volatile Database database;
private volatile BatchingVoteUpdateChannel batchingVoteUpdateChannel;
private volatile Optional<BlockPruner> blockPruner = Optional.empty();
private volatile Optional<BlobsSidecarPruner> blobsPruner = Optional.empty();
private volatile Optional<BlobSidecarPruner> blobsPruner = Optional.empty();
private final boolean depositSnapshotStorageEnabled;
private final boolean blobsSidecarStorageCountersEnabled;

Expand Down Expand Up @@ -93,7 +92,7 @@ protected SafeFuture<?> doStart() {
if (config.getSpec().isMilestoneSupported(SpecMilestone.DENEB)) {
blobsPruner =
Optional.of(
new BlobsSidecarPruner(
new BlobSidecarPruner(
config.getSpec(),
database,
serviceConfig.getMetricsSystem(),
Expand Down Expand Up @@ -130,8 +129,6 @@ protected SafeFuture<?> doStart() {
.subscribe(Eth1DepositStorageChannel.class, depositStorage)
.subscribe(Eth1EventsChannel.class, depositStorage)
.subscribe(VoteUpdateChannel.class, batchingVoteUpdateChannel);
blobsPruner.ifPresent(
pruner -> eventChannels.subscribe(FinalizedCheckpointChannel.class, pruner));
})
.thenCompose(
__ ->
Expand All @@ -141,7 +138,7 @@ protected SafeFuture<?> doStart() {
.thenCompose(
__ ->
blobsPruner
.map(BlobsSidecarPruner::start)
.map(BlobSidecarPruner::start)
.orElseGet(() -> SafeFuture.completedFuture(null)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
Expand Down Expand Up @@ -87,6 +86,4 @@ SafeFuture<Optional<StateAndBlockSummary>> getHotStateAndBlockSummaryByBlockRoot

SafeFuture<List<SlotAndBlockRootAndBlobIndex>> getBlobSidecarKeys(
UInt64 startSlot, UInt64 endSlot, UInt64 limit);

SafeFuture<Optional<BlobsSidecar>> getBlobsSidecar(SlotAndBlockRoot slotAndBlockRoot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import tech.pegasys.teku.infrastructure.events.ChannelInterface;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobSidecar;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.BlobsSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot;
import tech.pegasys.teku.spec.datastructures.state.AnchorPoint;
Expand All @@ -48,11 +47,7 @@ SafeFuture<Void> onFinalizedBlocks(

SafeFuture<Void> onBlobSidecar(BlobSidecar blobSidecar);

SafeFuture<Void> onBlobsSidecar(BlobsSidecar blobsSidecar);

SafeFuture<Void> onBlobSidecarsRemoval(UInt64 slot);

SafeFuture<Void> onBlobsSidecarRemoval(SlotAndBlockRoot blobsSidecarKey);

void onChainInitialized(AnchorPoint initialAnchor);
}
Loading