diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 07609676809..93e251f4fc5 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -159,6 +159,7 @@ protected ForwardSyncService createForwardSyncService() { pendingBlocks, p2pNetwork, blockImporter, + blobsSidecarManager, spec); } else { LOG.info("Using single peer sync"); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java index 3c06fd497b2..b74960d42da 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java @@ -16,27 +16,37 @@ import static com.google.common.base.Preconditions.checkState; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.networking.p2p.peer.DisconnectReason; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; public class BatchImporter { private static final Logger LOG = LogManager.getLogger(); private final BlockImporter blockImporter; + private final BlobsSidecarManager blobsSidecarManager; private final AsyncRunner asyncRunner; - public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncRunner) { + public BatchImporter( + final BlockImporter blockImporter, + final BlobsSidecarManager blobsSidecarManager, + final AsyncRunner asyncRunner) { this.blockImporter = blockImporter; + this.blobsSidecarManager = blobsSidecarManager; this.asyncRunner = asyncRunner; } @@ -51,20 +61,29 @@ public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncR public SafeFuture importBatch(final Batch batch) { // Copy the data from batch as we're going to use them from off the event thread. final List blocks = new ArrayList<>(batch.getBlocks()); + final Map blobsSidecarsBySlot = + new HashMap<>(batch.getBlobsSidecarsBySlot()); final Optional source = batch.getSource(); checkState(!blocks.isEmpty(), "Batch has no blocks to import"); return asyncRunner.runAsync( () -> { + final SignedBeaconBlock firstBlock = blocks.get(0); SafeFuture importResult = - importBlock(blocks.get(0), source.orElseThrow()); + storeBlobsSidecarAndImportBlock( + Optional.ofNullable(blobsSidecarsBySlot.get(firstBlock.getSlot())), + firstBlock, + source.orElseThrow()); for (int i = 1; i < blocks.size(); i++) { final SignedBeaconBlock block = blocks.get(i); importResult = importResult.thenCompose( previousResult -> { if (previousResult.isSuccessful()) { - return importBlock(block, source.orElseThrow()); + return storeBlobsSidecarAndImportBlock( + Optional.ofNullable(blobsSidecarsBySlot.get(block.getSlot())), + block, + source.orElseThrow()); } else { return SafeFuture.completedFuture(previousResult); } @@ -87,8 +106,11 @@ public SafeFuture importBatch(final Batch batch) { }); } - private SafeFuture importBlock( - final SignedBeaconBlock block, final SyncSource source) { + private SafeFuture storeBlobsSidecarAndImportBlock( + final Optional blobsSidecar, + final SignedBeaconBlock block, + final SyncSource source) { + blobsSidecar.ifPresent(blobsSidecarManager::storeUnconfirmedBlobsSidecar); return blockImporter .importBlock(block) .thenApply( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index d325f0a8a30..5d1f76ee3d7 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.config.Constants; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; import tech.pegasys.teku.statetransition.util.PendingPool; import tech.pegasys.teku.storage.client.RecentChainData; @@ -70,6 +71,7 @@ public static MultipeerSyncService create( final PendingPool pendingBlocks, final P2PNetwork p2pNetwork, final BlockImporter blockImporter, + final BlobsSidecarManager blobsSidecarManager, final Spec spec) { final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory); final SettableLabelledGauge targetChainCountGauge = @@ -87,8 +89,9 @@ public static MultipeerSyncService create( eventThread, asyncRunner, recentChainData, - new BatchImporter(blockImporter, asyncRunner), - new BatchFactory(eventThread, new PeerScoringConflictResolutionStrategy()), + new BatchImporter(blockImporter, blobsSidecarManager, asyncRunner), + new BatchFactory( + eventThread, new PeerScoringConflictResolutionStrategy(), blobsSidecarManager), Constants.SYNC_BATCH_SIZE, MultipeerCommonAncestorFinder.create(recentChainData, eventThread, spec), timeProvider); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java index 568e7f706c4..2a3c32341ea 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java @@ -14,11 +14,13 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer.batches; import java.util.List; +import java.util.Map; import java.util.Optional; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; /** A section of a particular target chain that can be downloaded in parallel. */ public interface Batch { @@ -34,6 +36,8 @@ public interface Batch { List getBlocks(); + Map getBlobsSidecarsBySlot(); + Optional getSource(); void markComplete(); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java index 41e83fd318c..2544311047e 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java @@ -16,15 +16,21 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class BatchFactory { + private final EventThread eventThread; private final ConflictResolutionStrategy conflictResolutionStrategy; + private final BlobsSidecarManager blobsSidecarManager; public BatchFactory( - final EventThread eventThread, final ConflictResolutionStrategy conflictResolutionStrategy) { + final EventThread eventThread, + final ConflictResolutionStrategy conflictResolutionStrategy, + final BlobsSidecarManager blobsSidecarManager) { this.eventThread = eventThread; this.conflictResolutionStrategy = conflictResolutionStrategy; + this.blobsSidecarManager = blobsSidecarManager; } public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt64 count) { @@ -33,6 +39,12 @@ public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt return new EventThreadOnlyBatch( eventThread, new SyncSourceBatch( - eventThread, syncSourceProvider, conflictResolutionStrategy, chain, start, count)); + eventThread, + syncSourceProvider, + conflictResolutionStrategy, + chain, + blobsSidecarManager, + start, + count)); } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java index 0dab7b705e2..d794940b7aa 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java @@ -15,12 +15,14 @@ import com.google.common.base.MoreObjects; import java.util.List; +import java.util.Map; import java.util.Optional; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; public class EventThreadOnlyBatch implements Batch { private final EventThread eventThread; @@ -67,6 +69,12 @@ public List getBlocks() { return delegate.getBlocks(); } + @Override + public Map getBlobsSidecarsBySlot() { + eventThread.checkOnEventThread(); + return delegate.getBlobsSidecarsBySlot(); + } + @Override public Optional getSource() { eventThread.checkOnEventThread(); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java index a56fb04acee..f2385ae5203 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java @@ -19,7 +19,9 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,6 +36,8 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class SyncSourceBatch implements Batch { private static final Logger LOG = LogManager.getLogger(); @@ -42,6 +46,7 @@ public class SyncSourceBatch implements Batch { private final SyncSourceSelector syncSourceProvider; private final ConflictResolutionStrategy conflictResolutionStrategy; private final TargetChain targetChain; + private final BlobsSidecarManager blobsSidecarManager; private final UInt64 firstSlot; private final UInt64 count; @@ -51,13 +56,16 @@ public class SyncSourceBatch implements Batch { private boolean firstBlockConfirmed = false; private boolean lastBlockConfirmed = false; private boolean awaitingBlocks = false; + private final List blocks = new ArrayList<>(); + private final Map blobsSidecarsBySlot = new HashMap<>(); SyncSourceBatch( final EventThread eventThread, final SyncSourceSelector syncSourceProvider, final ConflictResolutionStrategy conflictResolutionStrategy, final TargetChain targetChain, + final BlobsSidecarManager blobsSidecarManager, final UInt64 firstSlot, final UInt64 count) { checkArgument( @@ -66,6 +74,7 @@ public class SyncSourceBatch implements Batch { this.syncSourceProvider = syncSourceProvider; this.conflictResolutionStrategy = conflictResolutionStrategy; this.targetChain = targetChain; + this.blobsSidecarManager = blobsSidecarManager; this.firstSlot = firstSlot; this.count = count; } @@ -100,6 +109,11 @@ public List getBlocks() { return blocks; } + @Override + public Map getBlobsSidecarsBySlot() { + return blobsSidecarsBySlot; + } + @Override public Optional getSource() { return currentSyncSource; @@ -182,10 +196,12 @@ public void markAsInvalid() { public void requestMoreBlocks(final Runnable callback) { checkState( !isComplete() || isContested(), "Attempting to request more blocks from a complete batch"); - final RequestHandler requestHandler = new RequestHandler(); + final BlockRequestHandler blockRequestHandler = new BlockRequestHandler(); final UInt64 startSlot = getLastBlock().map(SignedBeaconBlock::getSlot).map(UInt64::increment).orElse(firstSlot); final UInt64 remainingSlots = count.minus(startSlot.minus(firstSlot)); + final UInt64 lastSlot = startSlot.plus(remainingSlots).decrement(); + checkState( remainingSlots.isGreaterThan(UInt64.ZERO), "Attempting to request more blocks when block for last slot already present."); @@ -198,11 +214,34 @@ public void requestMoreBlocks(final Runnable callback) { } awaitingBlocks = true; final SyncSource syncSource = currentSyncSource.orElseThrow(); + LOG.debug( - "Requesting {} slots starting at {} from peer {}", remainingSlots, startSlot, syncSource); - syncSource - .requestBlocksByRange(startSlot, remainingSlots, requestHandler) - .thenRunAsync(() -> onRequestComplete(requestHandler), eventThread) + "Requesting blocks for {} slots starting at {} from peer {}", + remainingSlots, + startSlot, + syncSource); + + final SafeFuture blocksRequest = + syncSource.requestBlocksByRange(startSlot, remainingSlots, blockRequestHandler); + + final SafeFuture blobsSidecarsRequest; + if (blobsSidecarManager.isStorageOfBlobsSidecarRequired(lastSlot)) { + LOG.debug( + "Requesting blobs sidecars for {} slots starting at {} from peer {}", + remainingSlots, + startSlot, + syncSource); + final BlobsSidecarRequestHandler blobsSidecarRequestHandler = + new BlobsSidecarRequestHandler(); + blobsSidecarsRequest = + syncSource.requestBlobsSidecarsByRange( + startSlot, remainingSlots, blobsSidecarRequestHandler); + } else { + blobsSidecarsRequest = SafeFuture.COMPLETE; + } + + SafeFuture.allOfFailFast(blocksRequest, blobsSidecarsRequest) + .thenRunAsync(() -> onRequestComplete(blockRequestHandler), eventThread) .handleAsync( (__, error) -> { if (error != null) { @@ -244,11 +283,12 @@ private void reset() { firstBlockConfirmed = false; lastBlockConfirmed = false; blocks.clear(); + blobsSidecarsBySlot.clear(); } - private void onRequestComplete(final RequestHandler requestHandler) { + private void onRequestComplete(final BlockRequestHandler blockRequestHandler) { eventThread.checkOnEventThread(); - final List newBlocks = requestHandler.complete(); + final List newBlocks = blockRequestHandler.complete(); awaitingBlocks = false; if (!blocks.isEmpty() && !newBlocks.isEmpty()) { @@ -299,7 +339,7 @@ private String formatBlockAndParent(final SignedBeaconBlock block1) { + ")"; } - private static class RequestHandler implements RpcResponseListener { + private static class BlockRequestHandler implements RpcResponseListener { private final List blocks = new ArrayList<>(); @Override @@ -312,4 +352,13 @@ public List complete() { return blocks; } } + + private class BlobsSidecarRequestHandler implements RpcResponseListener { + + @Override + public SafeFuture onResponse(final BlobsSidecar response) { + blobsSidecarsBySlot.put(response.getBeaconBlockSlot(), response); + return SafeFuture.COMPLETE; + } + } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java index d2d506d7879..cbd7a17dfc0 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java @@ -24,7 +24,9 @@ import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,27 +34,33 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch; import tech.pegasys.teku.infrastructure.async.SafeFuture; import tech.pegasys.teku.infrastructure.async.StubAsyncRunner; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.networking.p2p.peer.DisconnectReason; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; class BatchImporterTest { private final DataStructureUtil dataStructureUtil = - new DataStructureUtil(TestSpecFactory.createDefault()); + new DataStructureUtil(TestSpecFactory.createMinimalEip4844()); private final BlockImporter blockImporter = mock(BlockImporter.class); + private final BlobsSidecarManager blobsSidecarManager = mock(BlobsSidecarManager.class); private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final Batch batch = mock(Batch.class); final SyncSource syncSource = mock(SyncSource.class); - private final BatchImporter importer = new BatchImporter(blockImporter, asyncRunner); + private final BatchImporter importer = + new BatchImporter(blockImporter, blobsSidecarManager, asyncRunner); @BeforeEach public void setup() { when(batch.getSource()).thenReturn(Optional.of(syncSource)); + when(batch.getBlobsSidecarsBySlot()).thenReturn(Collections.emptyMap()); } @Test @@ -74,10 +82,11 @@ void shouldImportBlocksInOrder() { // Should not be started on the calling thread verifyNoInteractions(blockImporter); - // We should have copied the blocks to avoid accessing the Batch data from other threads + // We should have copied the blocks and blobs sidecars to avoid accessing the Batch data from + // other threads verify(batch).getBlocks(); + verify(batch).getBlobsSidecarsBySlot(); verify(batch).getSource(); - blocks.clear(); asyncRunner.executeQueuedActions(); @@ -88,6 +97,67 @@ void shouldImportBlocksInOrder() { blockImportedSuccessfully(block3, importResult3); assertThat(result).isCompletedWithValue(BatchImportResult.IMPORTED_ALL_BLOCKS); + // no blobs sidecars processing is expected + verifyNoInteractions(blobsSidecarManager); + + // And check we didn't touch the batch from a different thread + verifyNoMoreInteractions(batch); + } + + @Test + void shouldImportBlockAndBlobsSidecarsInOrder() { + final SignedBeaconBlock block1 = dataStructureUtil.randomSignedBeaconBlock(1); + final SignedBeaconBlock block2 = dataStructureUtil.randomSignedBeaconBlock(2); + final SignedBeaconBlock block3 = dataStructureUtil.randomSignedBeaconBlock(3); + + final BlobsSidecar blobsSidecar1 = dataStructureUtil.randomBlobsSidecarForBlock(block1); + final BlobsSidecar blobsSidecar2 = dataStructureUtil.randomBlobsSidecarForBlock(block2); + final BlobsSidecar blobsSidecar3 = dataStructureUtil.randomBlobsSidecarForBlock(block3); + + final SafeFuture importResult1 = new SafeFuture<>(); + final SafeFuture importResult2 = new SafeFuture<>(); + final SafeFuture importResult3 = new SafeFuture<>(); + + final List blocks = new ArrayList<>(List.of(block1, block2, block3)); + final Map blobsSidecarsBySlot = + Map.of( + block1.getSlot(), + blobsSidecar1, + block2.getSlot(), + blobsSidecar2, + block3.getSlot(), + blobsSidecar3); + + when(batch.getBlocks()).thenReturn(blocks); + when(batch.getBlobsSidecarsBySlot()).thenReturn(blobsSidecarsBySlot); + + when(blockImporter.importBlock(block1)).thenReturn(importResult1); + when(blockImporter.importBlock(block2)).thenReturn(importResult2); + when(blockImporter.importBlock(block3)).thenReturn(importResult3); + + final SafeFuture result = importer.importBatch(batch); + + // Should not be started on the calling thread + verifyNoInteractions(blockImporter); + + // We should have copied the blocks and blobs sidecars to avoid accessing the Batch data from + // other threads + verify(batch).getBlocks(); + verify(batch).getBlobsSidecarsBySlot(); + verify(batch).getSource(); + + asyncRunner.executeQueuedActions(); + + blobsSidecarImportedSuccessfully(blobsSidecar1); + blockImportedSuccessfully(block1, importResult1); + assertThat(result).isNotDone(); + blobsSidecarImportedSuccessfully(blobsSidecar2); + blockImportedSuccessfully(block2, importResult2); + assertThat(result).isNotDone(); + blobsSidecarImportedSuccessfully(blobsSidecar3); + blockImportedSuccessfully(block3, importResult3); + assertThat(result).isCompletedWithValue(BatchImportResult.IMPORTED_ALL_BLOCKS); + // And check we didn't touch the batch from a different thread verifyNoMoreInteractions(batch); } @@ -192,6 +262,11 @@ void shouldNotDisconnectPeersWhenServiceOffline() { verifyNoMoreInteractions(blockImporter); } + private void blobsSidecarImportedSuccessfully(final BlobsSidecar blobsSidecar) { + verify(blobsSidecarManager).storeUnconfirmedBlobsSidecar(blobsSidecar); + verifyNoMoreInteractions(blobsSidecarManager); + } + private void blockImportedSuccessfully( final SignedBeaconBlock block, final SafeFuture importResult1) { ignoreFuture(verify(blockImporter).importBlock(block)); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java index d45a4389397..e6fe432ceab 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; import static tech.pegasys.teku.beacon.sync.forward.multipeer.batches.BatchAssert.assertThatBatch; import static tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChainTestUtil.chainWith; @@ -30,6 +31,7 @@ import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; @@ -40,17 +42,20 @@ import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class SyncSourceBatchTest { private final DataStructureUtil dataStructureUtil = - new DataStructureUtil(TestSpecFactory.createDefault()); + new DataStructureUtil(TestSpecFactory.createMinimalEip4844()); private final TargetChain targetChain = chainWith(new SlotAndBlockRoot(UInt64.valueOf(1000), Bytes32.ZERO)); private final InlineEventThread eventThread = new InlineEventThread(); private final ConflictResolutionStrategy conflictResolutionStrategy = mock(ConflictResolutionStrategy.class); + private final BlobsSidecarManager blobsSidecarManager = mock(BlobsSidecarManager.class); private final Map> syncSources = new HashMap<>(); @Test @@ -119,6 +124,43 @@ void requestMoreBlocks_shouldResetAndSelectNewPeerAfterDisconnection() { secondSyncSource.assertRequestedBlocks(70, 50); } + @Test + void requestMoreBlocks_shouldRequestSidecarsIfRequired() { + final Runnable callback = mock(Runnable.class); + final ArgumentCaptor lastSlotArgumentCaptor = ArgumentCaptor.forClass(UInt64.class); + when(blobsSidecarManager.isStorageOfBlobsSidecarRequired(lastSlotArgumentCaptor.capture())) + .thenReturn(true); + + final Batch batch = createBatch(70, 50); + + batch.requestMoreBlocks(callback); + verifyNoInteractions(callback); + + final SignedBeaconBlock block = dataStructureUtil.randomSignedBeaconBlock(75); + final BlobsSidecar blobsSidecar = dataStructureUtil.randomBlobsSidecarForBlock(block); + + receiveBlocks(batch, block); + receiveBlobsSidecars(batch, blobsSidecar); + + verify(callback).run(); + batch.markFirstBlockConfirmed(); + batch.markAsContested(); + + final Map blobsSidecars = batch.getBlobsSidecarsBySlot(); + + // verify sidecars are cached + assertThat(blobsSidecars).hasSize(1); + assertThat(blobsSidecars.get(UInt64.valueOf(75))).isEqualTo(blobsSidecar); + assertThat(batch.getBlocks()).containsExactly(block); + + batch.requestMoreBlocks(callback); + getSyncSource(batch).assertRequestedBlocks(76, 44); + getSyncSource(batch).assertRequestedBlobsSidecars(76, 44); + + assertThat(lastSlotArgumentCaptor.getAllValues()) + .containsExactly(UInt64.valueOf(119), UInt64.valueOf(119)); + } + @Test void markContested_shouldVerifyBatchWithConflictResolutionStrategy() { final Batch batch = createBatch(1, 3); @@ -194,6 +236,7 @@ void shouldSkipMakingRequestWhenNoTargetPeerIsAvailable() { emptySourceSelector, conflictResolutionStrategy, targetChain, + blobsSidecarManager, UInt64.ONE, UInt64.ONE); @@ -222,6 +265,7 @@ protected Batch createBatch(final long startSlot, final long count) { syncSourceProvider, conflictResolutionStrategy, targetChain, + blobsSidecarManager, UInt64.valueOf(startSlot), UInt64.valueOf(count)); this.syncSources.put(batch, syncSources); @@ -232,6 +276,10 @@ protected void receiveBlocks(final Batch batch, final SignedBeaconBlock... block getSyncSource(batch).receiveBlocks(blocks); } + protected void receiveBlobsSidecars(final Batch batch, final BlobsSidecar... blobsSidecars) { + getSyncSource(batch).receiveBlobsSidecars(blobsSidecars); + } + protected void requestError(final Batch batch, final Throwable error) { getSyncSource(batch).failRequest(error); } diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java index 65a2f51d62e..56a67d42dab 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java @@ -29,6 +29,7 @@ import tech.pegasys.teku.networking.eth2.peers.StubSyncSource; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class StubBatchFactory extends BatchFactory implements Iterable { private final List batches = new ArrayList<>(); @@ -37,8 +38,8 @@ public class StubBatchFactory extends BatchFactory implements Iterable { private final EventThread eventThread; private final boolean enforceEventThread; - public StubBatchFactory(final EventThread eventThread, final boolean enforceEventThread) { - super(eventThread, null); + public StubBatchFactory(final EventThread eventThread, boolean enforceEventThread) { + super(eventThread, null, BlobsSidecarManager.NOOP); this.eventThread = eventThread; this.enforceEventThread = enforceEventThread; } @@ -121,7 +122,9 @@ public BatchSupport( final TargetChain chain, final UInt64 start, final UInt64 count) { - batch = new SyncSourceBatch(eventThread, this, this, chain, start, count); + batch = + new SyncSourceBatch( + eventThread, this, this, chain, BlobsSidecarManager.NOOP, start, count); eventThreadOnlyBatch = new EventThreadOnlyBatch(eventThread, batch); } diff --git a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java index cff7d3b17e8..9586130c5df 100644 --- a/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java +++ b/networking/eth2/src/testFixtures/java/tech/pegasys/teku/networking/eth2/peers/StubSyncSource.java @@ -31,18 +31,31 @@ public class StubSyncSource implements SyncSource { - private final List requests = new ArrayList<>(); - private Optional> currentRequest = Optional.empty(); - private Optional> currentListener = Optional.empty(); + private final List blocksRequests = new ArrayList<>(); + private final List blobsSidecarsRequests = new ArrayList<>(); + + private Optional> currentBlockRequest = Optional.empty(); + private Optional> currentBlockListener = Optional.empty(); + + private Optional> currentBlobsSidecarRequest = Optional.empty(); + private Optional> currentBlobsSidecarListener = + Optional.empty(); public void receiveBlocks(final SignedBeaconBlock... blocks) { - final RpcResponseListener listener = currentListener.orElseThrow(); + final RpcResponseListener listener = currentBlockListener.orElseThrow(); Stream.of(blocks).forEach(response -> assertThat(listener.onResponse(response)).isCompleted()); - currentRequest.orElseThrow().complete(null); + currentBlockRequest.orElseThrow().complete(null); + } + + public void receiveBlobsSidecars(final BlobsSidecar... blobsSidecars) { + final RpcResponseListener listener = currentBlobsSidecarListener.orElseThrow(); + Stream.of(blobsSidecars) + .forEach(response -> assertThat(listener.onResponse(response)).isCompleted()); + currentBlobsSidecarRequest.orElseThrow().complete(null); } public void failRequest(final Throwable error) { - currentRequest.orElseThrow().completeExceptionally(error); + currentBlockRequest.orElseThrow().completeExceptionally(error); } @Override @@ -51,10 +64,10 @@ public SafeFuture requestBlocksByRange( final UInt64 count, final RpcResponseListener listener) { checkArgument(count.isGreaterThan(UInt64.ZERO), "Count must be greater than zero"); - requests.add(new Request(startSlot, count)); + blocksRequests.add(new Request(startSlot, count)); final SafeFuture request = new SafeFuture<>(); - currentRequest = Optional.of(request); - currentListener = Optional.of(listener); + currentBlockRequest = Optional.of(request); + currentBlockListener = Optional.of(listener); return request; } @@ -63,7 +76,12 @@ public SafeFuture requestBlobsSidecarsByRange( final UInt64 startSlot, final UInt64 count, final RpcResponseListener listener) { - return SafeFuture.failedFuture(new UnsupportedOperationException("Not yet implemented")); + checkArgument(count.isGreaterThan(UInt64.ZERO), "Count must be greater than zero"); + blobsSidecarsRequests.add(new Request(startSlot, count)); + final SafeFuture request = new SafeFuture<>(); + currentBlobsSidecarRequest = Optional.of(request); + currentBlobsSidecarListener = Optional.of(listener); + return request; } @Override @@ -72,7 +90,13 @@ public SafeFuture disconnectCleanly(final DisconnectReason reason) { } public void assertRequestedBlocks(final long startSlot, final long count) { - assertThat(requests).contains(new Request(UInt64.valueOf(startSlot), UInt64.valueOf(count))); + assertThat(blocksRequests) + .contains(new Request(UInt64.valueOf(startSlot), UInt64.valueOf(count))); + } + + public void assertRequestedBlobsSidecars(final long startSlot, final long count) { + assertThat(blobsSidecarsRequests) + .contains(new Request(UInt64.valueOf(startSlot), UInt64.valueOf(count))); } @Override