Skip to content

Commit

Permalink
Implement forward multipeer sync for EIP-4844
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov committed Jan 13, 2023
1 parent 139fd00 commit 630162b
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ protected ForwardSyncService createForwardSyncService() {
pendingBlocks,
p2pNetwork,
blockImporter,
blobsSidecarManager,
spec);
} else {
LOG.info("Using single peer sync");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,37 @@
import static com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch;
import tech.pegasys.teku.infrastructure.async.AsyncRunner;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager;
import tech.pegasys.teku.statetransition.block.BlockImporter;

public class BatchImporter {
private static final Logger LOG = LogManager.getLogger();

private final BlockImporter blockImporter;
private final BlobsSidecarManager blobsSidecarManager;
private final AsyncRunner asyncRunner;

public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncRunner) {
public BatchImporter(
final BlockImporter blockImporter,
final BlobsSidecarManager blobsSidecarManager,
final AsyncRunner asyncRunner) {
this.blockImporter = blockImporter;
this.blobsSidecarManager = blobsSidecarManager;
this.asyncRunner = asyncRunner;
}

Expand All @@ -51,20 +61,29 @@ public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncR
public SafeFuture<BatchImportResult> importBatch(final Batch batch) {
// Copy the data from batch as we're going to use them from off the event thread.
final List<SignedBeaconBlock> blocks = new ArrayList<>(batch.getBlocks());
final Map<UInt64, BlobsSidecar> blobsSidecarsBySlot =
new HashMap<>(batch.getBlobsSidecarsBySlot());
final Optional<SyncSource> source = batch.getSource();

checkState(!blocks.isEmpty(), "Batch has no blocks to import");
return asyncRunner.runAsync(
() -> {
final SignedBeaconBlock firstBlock = blocks.get(0);
SafeFuture<BlockImportResult> importResult =
importBlock(blocks.get(0), source.orElseThrow());
storeBlobsSidecarAndImportBlock(
Optional.ofNullable(blobsSidecarsBySlot.get(firstBlock.getSlot())),
firstBlock,
source.orElseThrow());
for (int i = 1; i < blocks.size(); i++) {
final SignedBeaconBlock block = blocks.get(i);
importResult =
importResult.thenCompose(
previousResult -> {
if (previousResult.isSuccessful()) {
return importBlock(block, source.orElseThrow());
return storeBlobsSidecarAndImportBlock(
Optional.ofNullable(blobsSidecarsBySlot.get(block.getSlot())),
block,
source.orElseThrow());
} else {
return SafeFuture.completedFuture(previousResult);
}
Expand All @@ -87,8 +106,11 @@ public SafeFuture<BatchImportResult> importBatch(final Batch batch) {
});
}

private SafeFuture<BlockImportResult> importBlock(
final SignedBeaconBlock block, final SyncSource source) {
private SafeFuture<BlockImportResult> storeBlobsSidecarAndImportBlock(
final Optional<BlobsSidecar> blobsSidecar,
final SignedBeaconBlock block,
final SyncSource source) {
blobsSidecar.ifPresent(blobsSidecarManager::storeUnconfirmedBlobsSidecar);
return blockImporter
.importBlock(block)
.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.config.Constants;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager;
import tech.pegasys.teku.statetransition.block.BlockImporter;
import tech.pegasys.teku.statetransition.util.PendingPool;
import tech.pegasys.teku.storage.client.RecentChainData;
Expand Down Expand Up @@ -70,6 +71,7 @@ public static MultipeerSyncService create(
final PendingPool<SignedBeaconBlock> pendingBlocks,
final P2PNetwork<Eth2Peer> p2pNetwork,
final BlockImporter blockImporter,
final BlobsSidecarManager blobsSidecarManager,
final Spec spec) {
final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory);
final SettableLabelledGauge targetChainCountGauge =
Expand All @@ -87,8 +89,9 @@ public static MultipeerSyncService create(
eventThread,
asyncRunner,
recentChainData,
new BatchImporter(blockImporter, asyncRunner),
new BatchFactory(eventThread, new PeerScoringConflictResolutionStrategy()),
new BatchImporter(blockImporter, blobsSidecarManager, asyncRunner),
new BatchFactory(
eventThread, new PeerScoringConflictResolutionStrategy(), blobsSidecarManager),
Constants.SYNC_BATCH_SIZE,
MultipeerCommonAncestorFinder.create(recentChainData, eventThread, spec),
timeProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
package tech.pegasys.teku.beacon.sync.forward.multipeer.batches;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;

/** A section of a particular target chain that can be downloaded in parallel. */
public interface Batch {
Expand All @@ -34,6 +36,8 @@ public interface Batch {

List<SignedBeaconBlock> getBlocks();

Map<UInt64, BlobsSidecar> getBlobsSidecarsBySlot();

Optional<SyncSource> getSource();

void markComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,21 @@
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager;

public class BatchFactory {

private final EventThread eventThread;
private final ConflictResolutionStrategy conflictResolutionStrategy;
private final BlobsSidecarManager blobsSidecarManager;

public BatchFactory(
final EventThread eventThread, final ConflictResolutionStrategy conflictResolutionStrategy) {
final EventThread eventThread,
final ConflictResolutionStrategy conflictResolutionStrategy,
final BlobsSidecarManager blobsSidecarManager) {
this.eventThread = eventThread;
this.conflictResolutionStrategy = conflictResolutionStrategy;
this.blobsSidecarManager = blobsSidecarManager;
}

public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt64 count) {
Expand All @@ -33,6 +39,12 @@ public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt
return new EventThreadOnlyBatch(
eventThread,
new SyncSourceBatch(
eventThread, syncSourceProvider, conflictResolutionStrategy, chain, start, count));
eventThread,
syncSourceProvider,
conflictResolutionStrategy,
chain,
blobsSidecarManager,
start,
count));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

import com.google.common.base.MoreObjects;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain;
import tech.pegasys.teku.infrastructure.async.eventthread.EventThread;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.networking.eth2.peers.SyncSource;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;

public class EventThreadOnlyBatch implements Batch {
private final EventThread eventThread;
Expand Down Expand Up @@ -67,6 +69,12 @@ public List<SignedBeaconBlock> getBlocks() {
return delegate.getBlocks();
}

@Override
public Map<UInt64, BlobsSidecar> getBlobsSidecarsBySlot() {
eventThread.checkOnEventThread();
return delegate.getBlobsSidecarsBySlot();
}

@Override
public Optional<SyncSource> getSource() {
eventThread.checkOnEventThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,6 +36,8 @@
import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener;
import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar;
import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager;

public class SyncSourceBatch implements Batch {
private static final Logger LOG = LogManager.getLogger();
Expand All @@ -42,6 +46,7 @@ public class SyncSourceBatch implements Batch {
private final SyncSourceSelector syncSourceProvider;
private final ConflictResolutionStrategy conflictResolutionStrategy;
private final TargetChain targetChain;
private final BlobsSidecarManager blobsSidecarManager;
private final UInt64 firstSlot;
private final UInt64 count;

Expand All @@ -51,13 +56,16 @@ public class SyncSourceBatch implements Batch {
private boolean firstBlockConfirmed = false;
private boolean lastBlockConfirmed = false;
private boolean awaitingBlocks = false;

private final List<SignedBeaconBlock> blocks = new ArrayList<>();
private final Map<UInt64, BlobsSidecar> blobsSidecarsBySlot = new HashMap<>();

SyncSourceBatch(
final EventThread eventThread,
final SyncSourceSelector syncSourceProvider,
final ConflictResolutionStrategy conflictResolutionStrategy,
final TargetChain targetChain,
final BlobsSidecarManager blobsSidecarManager,
final UInt64 firstSlot,
final UInt64 count) {
checkArgument(
Expand All @@ -66,6 +74,7 @@ public class SyncSourceBatch implements Batch {
this.syncSourceProvider = syncSourceProvider;
this.conflictResolutionStrategy = conflictResolutionStrategy;
this.targetChain = targetChain;
this.blobsSidecarManager = blobsSidecarManager;
this.firstSlot = firstSlot;
this.count = count;
}
Expand Down Expand Up @@ -100,6 +109,11 @@ public List<SignedBeaconBlock> getBlocks() {
return blocks;
}

@Override
public Map<UInt64, BlobsSidecar> getBlobsSidecarsBySlot() {
return blobsSidecarsBySlot;
}

@Override
public Optional<SyncSource> getSource() {
return currentSyncSource;
Expand Down Expand Up @@ -182,10 +196,12 @@ public void markAsInvalid() {
public void requestMoreBlocks(final Runnable callback) {
checkState(
!isComplete() || isContested(), "Attempting to request more blocks from a complete batch");
final RequestHandler requestHandler = new RequestHandler();
final BlockRequestHandler blockRequestHandler = new BlockRequestHandler();
final UInt64 startSlot =
getLastBlock().map(SignedBeaconBlock::getSlot).map(UInt64::increment).orElse(firstSlot);
final UInt64 remainingSlots = count.minus(startSlot.minus(firstSlot));
final UInt64 lastSlot = startSlot.plus(remainingSlots).decrement();

checkState(
remainingSlots.isGreaterThan(UInt64.ZERO),
"Attempting to request more blocks when block for last slot already present.");
Expand All @@ -200,9 +216,28 @@ public void requestMoreBlocks(final Runnable callback) {
final SyncSource syncSource = currentSyncSource.orElseThrow();
LOG.debug(
"Requesting {} slots starting at {} from peer {}", remainingSlots, startSlot, syncSource);
syncSource
.requestBlocksByRange(startSlot, remainingSlots, requestHandler)
.thenRunAsync(() -> onRequestComplete(requestHandler), eventThread)

final SafeFuture<Void> blocksRequest =
syncSource.requestBlocksByRange(startSlot, remainingSlots, blockRequestHandler);

final SafeFuture<Void> blobsSidecarsRequest;
if (blobsSidecarManager.isStorageOfBlobsSidecarRequired(lastSlot)) {
LOG.trace(
"Requesting {} blobs sidecars starting at {} from peer {}",
remainingSlots,
startSlot,
syncSource);
final BlobsSidecarRequestHandler blobsSidecarRequestHandler =
new BlobsSidecarRequestHandler();
blobsSidecarsRequest =
syncSource.requestBlobsSidecarsByRange(
startSlot, remainingSlots, blobsSidecarRequestHandler);
} else {
blobsSidecarsRequest = SafeFuture.COMPLETE;
}

SafeFuture.allOfFailFast(blocksRequest, blobsSidecarsRequest)
.thenRunAsync(() -> onRequestComplete(blockRequestHandler), eventThread)
.handleAsync(
(__, error) -> {
if (error != null) {
Expand Down Expand Up @@ -244,11 +279,12 @@ private void reset() {
firstBlockConfirmed = false;
lastBlockConfirmed = false;
blocks.clear();
blobsSidecarsBySlot.clear();
}

private void onRequestComplete(final RequestHandler requestHandler) {
private void onRequestComplete(final BlockRequestHandler blockRequestHandler) {
eventThread.checkOnEventThread();
final List<SignedBeaconBlock> newBlocks = requestHandler.complete();
final List<SignedBeaconBlock> newBlocks = blockRequestHandler.complete();

awaitingBlocks = false;
if (!blocks.isEmpty() && !newBlocks.isEmpty()) {
Expand Down Expand Up @@ -299,7 +335,7 @@ private String formatBlockAndParent(final SignedBeaconBlock block1) {
+ ")";
}

private static class RequestHandler implements RpcResponseListener<SignedBeaconBlock> {
private static class BlockRequestHandler implements RpcResponseListener<SignedBeaconBlock> {
private final List<SignedBeaconBlock> blocks = new ArrayList<>();

@Override
Expand All @@ -312,4 +348,13 @@ public List<SignedBeaconBlock> complete() {
return blocks;
}
}

private class BlobsSidecarRequestHandler implements RpcResponseListener<BlobsSidecar> {

@Override
public SafeFuture<?> onResponse(final BlobsSidecar response) {
blobsSidecarsBySlot.put(response.getBeaconBlockSlot(), response);
return SafeFuture.COMPLETE;
}
}
}
Loading

0 comments on commit 630162b

Please sign in to comment.