From 19b3986679efc9483ec847a5a0fa508cdc8330a7 Mon Sep 17 00:00:00 2001 From: Stefan Bratanov Date: Fri, 13 Jan 2023 16:43:57 +0000 Subject: [PATCH] Implement forward multipeer sync for EIP-4844 --- .../sync/DefaultSyncServiceFactory.java | 1 + .../sync/forward/multipeer/BatchImporter.java | 32 ++++++++-- .../multipeer/MultipeerSyncService.java | 7 ++- .../sync/forward/multipeer/batches/Batch.java | 4 ++ .../multipeer/batches/BatchFactory.java | 16 ++++- .../batches/EventThreadOnlyBatch.java | 8 +++ .../multipeer/batches/SyncSourceBatch.java | 59 ++++++++++++++++--- .../forward/multipeer/BatchImporterTest.java | 11 +++- .../batches/SyncSourceBatchTest.java | 4 ++ .../multipeer/batches/StubBatchFactory.java | 9 ++- 10 files changed, 130 insertions(+), 21 deletions(-) diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java index 07609676809..93e251f4fc5 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/DefaultSyncServiceFactory.java @@ -159,6 +159,7 @@ protected ForwardSyncService createForwardSyncService() { pendingBlocks, p2pNetwork, blockImporter, + blobsSidecarManager, spec); } else { LOG.info("Using single peer sync"); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java index 3c06fd497b2..b74960d42da 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporter.java @@ -16,27 +16,37 @@ import static com.google.common.base.Preconditions.checkState; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import tech.pegasys.teku.beacon.sync.forward.multipeer.batches.Batch; import tech.pegasys.teku.infrastructure.async.AsyncRunner; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.networking.p2p.peer.DisconnectReason; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; public class BatchImporter { private static final Logger LOG = LogManager.getLogger(); private final BlockImporter blockImporter; + private final BlobsSidecarManager blobsSidecarManager; private final AsyncRunner asyncRunner; - public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncRunner) { + public BatchImporter( + final BlockImporter blockImporter, + final BlobsSidecarManager blobsSidecarManager, + final AsyncRunner asyncRunner) { this.blockImporter = blockImporter; + this.blobsSidecarManager = blobsSidecarManager; this.asyncRunner = asyncRunner; } @@ -51,20 +61,29 @@ public BatchImporter(final BlockImporter blockImporter, final AsyncRunner asyncR public SafeFuture importBatch(final Batch batch) { // Copy the data from batch as we're going to use them from off the event thread. final List blocks = new ArrayList<>(batch.getBlocks()); + final Map blobsSidecarsBySlot = + new HashMap<>(batch.getBlobsSidecarsBySlot()); final Optional source = batch.getSource(); checkState(!blocks.isEmpty(), "Batch has no blocks to import"); return asyncRunner.runAsync( () -> { + final SignedBeaconBlock firstBlock = blocks.get(0); SafeFuture importResult = - importBlock(blocks.get(0), source.orElseThrow()); + storeBlobsSidecarAndImportBlock( + Optional.ofNullable(blobsSidecarsBySlot.get(firstBlock.getSlot())), + firstBlock, + source.orElseThrow()); for (int i = 1; i < blocks.size(); i++) { final SignedBeaconBlock block = blocks.get(i); importResult = importResult.thenCompose( previousResult -> { if (previousResult.isSuccessful()) { - return importBlock(block, source.orElseThrow()); + return storeBlobsSidecarAndImportBlock( + Optional.ofNullable(blobsSidecarsBySlot.get(block.getSlot())), + block, + source.orElseThrow()); } else { return SafeFuture.completedFuture(previousResult); } @@ -87,8 +106,11 @@ public SafeFuture importBatch(final Batch batch) { }); } - private SafeFuture importBlock( - final SignedBeaconBlock block, final SyncSource source) { + private SafeFuture storeBlobsSidecarAndImportBlock( + final Optional blobsSidecar, + final SignedBeaconBlock block, + final SyncSource source) { + blobsSidecar.ifPresent(blobsSidecarManager::storeUnconfirmedBlobsSidecar); return blockImporter .importBlock(block) .thenApply( diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java index d325f0a8a30..5d1f76ee3d7 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/MultipeerSyncService.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.config.Constants; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; import tech.pegasys.teku.statetransition.util.PendingPool; import tech.pegasys.teku.storage.client.RecentChainData; @@ -70,6 +71,7 @@ public static MultipeerSyncService create( final PendingPool pendingBlocks, final P2PNetwork p2pNetwork, final BlockImporter blockImporter, + final BlobsSidecarManager blobsSidecarManager, final Spec spec) { final EventThread eventThread = new AsyncRunnerEventThread("sync", asyncRunnerFactory); final SettableLabelledGauge targetChainCountGauge = @@ -87,8 +89,9 @@ public static MultipeerSyncService create( eventThread, asyncRunner, recentChainData, - new BatchImporter(blockImporter, asyncRunner), - new BatchFactory(eventThread, new PeerScoringConflictResolutionStrategy()), + new BatchImporter(blockImporter, blobsSidecarManager, asyncRunner), + new BatchFactory( + eventThread, new PeerScoringConflictResolutionStrategy(), blobsSidecarManager), Constants.SYNC_BATCH_SIZE, MultipeerCommonAncestorFinder.create(recentChainData, eventThread, spec), timeProvider); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java index 568e7f706c4..2a3c32341ea 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/Batch.java @@ -14,11 +14,13 @@ package tech.pegasys.teku.beacon.sync.forward.multipeer.batches; import java.util.List; +import java.util.Map; import java.util.Optional; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; /** A section of a particular target chain that can be downloaded in parallel. */ public interface Batch { @@ -34,6 +36,8 @@ public interface Batch { List getBlocks(); + Map getBlobsSidecarsBySlot(); + Optional getSource(); void markComplete(); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java index 41e83fd318c..2544311047e 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/BatchFactory.java @@ -16,15 +16,21 @@ import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class BatchFactory { + private final EventThread eventThread; private final ConflictResolutionStrategy conflictResolutionStrategy; + private final BlobsSidecarManager blobsSidecarManager; public BatchFactory( - final EventThread eventThread, final ConflictResolutionStrategy conflictResolutionStrategy) { + final EventThread eventThread, + final ConflictResolutionStrategy conflictResolutionStrategy, + final BlobsSidecarManager blobsSidecarManager) { this.eventThread = eventThread; this.conflictResolutionStrategy = conflictResolutionStrategy; + this.blobsSidecarManager = blobsSidecarManager; } public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt64 count) { @@ -33,6 +39,12 @@ public Batch createBatch(final TargetChain chain, final UInt64 start, final UInt return new EventThreadOnlyBatch( eventThread, new SyncSourceBatch( - eventThread, syncSourceProvider, conflictResolutionStrategy, chain, start, count)); + eventThread, + syncSourceProvider, + conflictResolutionStrategy, + chain, + blobsSidecarManager, + start, + count)); } } diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java index 0dab7b705e2..d794940b7aa 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/EventThreadOnlyBatch.java @@ -15,12 +15,14 @@ import com.google.common.base.MoreObjects; import java.util.List; +import java.util.Map; import java.util.Optional; import tech.pegasys.teku.beacon.sync.forward.multipeer.chains.TargetChain; import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; public class EventThreadOnlyBatch implements Batch { private final EventThread eventThread; @@ -67,6 +69,12 @@ public List getBlocks() { return delegate.getBlocks(); } + @Override + public Map getBlobsSidecarsBySlot() { + eventThread.checkOnEventThread(); + return delegate.getBlobsSidecarsBySlot(); + } + @Override public Optional getSource() { eventThread.checkOnEventThread(); diff --git a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java index a56fb04acee..a1e617f3f0d 100644 --- a/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java +++ b/beacon/sync/src/main/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatch.java @@ -19,7 +19,9 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,6 +36,8 @@ import tech.pegasys.teku.networking.p2p.rpc.RpcResponseListener; import tech.pegasys.teku.spec.datastructures.blocks.MinimalBeaconBlockSummary; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.spec.datastructures.execution.versions.eip4844.BlobsSidecar; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class SyncSourceBatch implements Batch { private static final Logger LOG = LogManager.getLogger(); @@ -42,6 +46,7 @@ public class SyncSourceBatch implements Batch { private final SyncSourceSelector syncSourceProvider; private final ConflictResolutionStrategy conflictResolutionStrategy; private final TargetChain targetChain; + private final BlobsSidecarManager blobsSidecarManager; private final UInt64 firstSlot; private final UInt64 count; @@ -51,13 +56,16 @@ public class SyncSourceBatch implements Batch { private boolean firstBlockConfirmed = false; private boolean lastBlockConfirmed = false; private boolean awaitingBlocks = false; + private final List blocks = new ArrayList<>(); + private final Map blobsSidecarsBySlot = new HashMap<>(); SyncSourceBatch( final EventThread eventThread, final SyncSourceSelector syncSourceProvider, final ConflictResolutionStrategy conflictResolutionStrategy, final TargetChain targetChain, + final BlobsSidecarManager blobsSidecarManager, final UInt64 firstSlot, final UInt64 count) { checkArgument( @@ -66,6 +74,7 @@ public class SyncSourceBatch implements Batch { this.syncSourceProvider = syncSourceProvider; this.conflictResolutionStrategy = conflictResolutionStrategy; this.targetChain = targetChain; + this.blobsSidecarManager = blobsSidecarManager; this.firstSlot = firstSlot; this.count = count; } @@ -100,6 +109,11 @@ public List getBlocks() { return blocks; } + @Override + public Map getBlobsSidecarsBySlot() { + return blobsSidecarsBySlot; + } + @Override public Optional getSource() { return currentSyncSource; @@ -182,10 +196,12 @@ public void markAsInvalid() { public void requestMoreBlocks(final Runnable callback) { checkState( !isComplete() || isContested(), "Attempting to request more blocks from a complete batch"); - final RequestHandler requestHandler = new RequestHandler(); + final BlockRequestHandler blockRequestHandler = new BlockRequestHandler(); final UInt64 startSlot = getLastBlock().map(SignedBeaconBlock::getSlot).map(UInt64::increment).orElse(firstSlot); final UInt64 remainingSlots = count.minus(startSlot.minus(firstSlot)); + final UInt64 lastSlot = startSlot.plus(remainingSlots).decrement(); + checkState( remainingSlots.isGreaterThan(UInt64.ZERO), "Attempting to request more blocks when block for last slot already present."); @@ -200,9 +216,28 @@ public void requestMoreBlocks(final Runnable callback) { final SyncSource syncSource = currentSyncSource.orElseThrow(); LOG.debug( "Requesting {} slots starting at {} from peer {}", remainingSlots, startSlot, syncSource); - syncSource - .requestBlocksByRange(startSlot, remainingSlots, requestHandler) - .thenRunAsync(() -> onRequestComplete(requestHandler), eventThread) + + final SafeFuture blocksRequest = + syncSource.requestBlocksByRange(startSlot, remainingSlots, blockRequestHandler); + + final SafeFuture blobsSidecarsRequest; + if (blobsSidecarManager.isStorageOfBlobsSidecarRequired(lastSlot)) { + LOG.trace( + "Requesting {} blobs sidecars starting at {} from peer {}", + remainingSlots, + startSlot, + syncSource); + final BlobsSidecarRequestHandler blobsSidecarRequestHandler = + new BlobsSidecarRequestHandler(); + blobsSidecarsRequest = + syncSource.requestBlobsSidecarsByRange( + startSlot, remainingSlots, blobsSidecarRequestHandler); + } else { + blobsSidecarsRequest = SafeFuture.COMPLETE; + } + + SafeFuture.allOfFailFast(blocksRequest, blobsSidecarsRequest) + .thenRunAsync(() -> onRequestComplete(blockRequestHandler), eventThread) .handleAsync( (__, error) -> { if (error != null) { @@ -244,11 +279,12 @@ private void reset() { firstBlockConfirmed = false; lastBlockConfirmed = false; blocks.clear(); + blobsSidecarsBySlot.clear(); } - private void onRequestComplete(final RequestHandler requestHandler) { + private void onRequestComplete(final BlockRequestHandler blockRequestHandler) { eventThread.checkOnEventThread(); - final List newBlocks = requestHandler.complete(); + final List newBlocks = blockRequestHandler.complete(); awaitingBlocks = false; if (!blocks.isEmpty() && !newBlocks.isEmpty()) { @@ -299,7 +335,7 @@ private String formatBlockAndParent(final SignedBeaconBlock block1) { + ")"; } - private static class RequestHandler implements RpcResponseListener { + private static class BlockRequestHandler implements RpcResponseListener { private final List blocks = new ArrayList<>(); @Override @@ -312,4 +348,13 @@ public List complete() { return blocks; } } + + private class BlobsSidecarRequestHandler implements RpcResponseListener { + + @Override + public SafeFuture onResponse(final BlobsSidecar response) { + blobsSidecarsBySlot.put(response.getBeaconBlockSlot(), response); + return SafeFuture.COMPLETE; + } + } } diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java index d2d506d7879..d5ab3e1f165 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/BatchImporterTest.java @@ -24,6 +24,7 @@ import static tech.pegasys.teku.infrastructure.async.FutureUtil.ignoreFuture; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Optional; import org.junit.jupiter.api.BeforeEach; @@ -38,21 +39,25 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; import tech.pegasys.teku.statetransition.block.BlockImporter; class BatchImporterTest { private final DataStructureUtil dataStructureUtil = new DataStructureUtil(TestSpecFactory.createDefault()); private final BlockImporter blockImporter = mock(BlockImporter.class); + private final BlobsSidecarManager blobsSidecarManager = mock(BlobsSidecarManager.class); private final StubAsyncRunner asyncRunner = new StubAsyncRunner(); private final Batch batch = mock(Batch.class); final SyncSource syncSource = mock(SyncSource.class); - private final BatchImporter importer = new BatchImporter(blockImporter, asyncRunner); + private final BatchImporter importer = + new BatchImporter(blockImporter, blobsSidecarManager, asyncRunner); @BeforeEach public void setup() { when(batch.getSource()).thenReturn(Optional.of(syncSource)); + when(batch.getBlobsSidecarsBySlot()).thenReturn(Collections.emptyMap()); } @Test @@ -74,8 +79,10 @@ void shouldImportBlocksInOrder() { // Should not be started on the calling thread verifyNoInteractions(blockImporter); - // We should have copied the blocks to avoid accessing the Batch data from other threads + // We should have copied the blocks and blobs sidecars to avoid accessing the Batch data from + // other threads verify(batch).getBlocks(); + verify(batch).getBlobsSidecarsBySlot(); verify(batch).getSource(); blocks.clear(); diff --git a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java index d45a4389397..c3d25bcc4ef 100644 --- a/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java +++ b/beacon/sync/src/test/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/SyncSourceBatchTest.java @@ -41,6 +41,7 @@ import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class SyncSourceBatchTest { @@ -51,6 +52,7 @@ public class SyncSourceBatchTest { private final InlineEventThread eventThread = new InlineEventThread(); private final ConflictResolutionStrategy conflictResolutionStrategy = mock(ConflictResolutionStrategy.class); + private final BlobsSidecarManager blobsSidecarManager = mock(BlobsSidecarManager.class); private final Map> syncSources = new HashMap<>(); @Test @@ -194,6 +196,7 @@ void shouldSkipMakingRequestWhenNoTargetPeerIsAvailable() { emptySourceSelector, conflictResolutionStrategy, targetChain, + blobsSidecarManager, UInt64.ONE, UInt64.ONE); @@ -222,6 +225,7 @@ protected Batch createBatch(final long startSlot, final long count) { syncSourceProvider, conflictResolutionStrategy, targetChain, + blobsSidecarManager, UInt64.valueOf(startSlot), UInt64.valueOf(count)); this.syncSources.put(batch, syncSources); diff --git a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java index 65a2f51d62e..56a67d42dab 100644 --- a/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java +++ b/beacon/sync/src/testFixtures/java/tech/pegasys/teku/beacon/sync/forward/multipeer/batches/StubBatchFactory.java @@ -29,6 +29,7 @@ import tech.pegasys.teku.networking.eth2.peers.StubSyncSource; import tech.pegasys.teku.networking.eth2.peers.SyncSource; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; +import tech.pegasys.teku.statetransition.blobs.BlobsSidecarManager; public class StubBatchFactory extends BatchFactory implements Iterable { private final List batches = new ArrayList<>(); @@ -37,8 +38,8 @@ public class StubBatchFactory extends BatchFactory implements Iterable { private final EventThread eventThread; private final boolean enforceEventThread; - public StubBatchFactory(final EventThread eventThread, final boolean enforceEventThread) { - super(eventThread, null); + public StubBatchFactory(final EventThread eventThread, boolean enforceEventThread) { + super(eventThread, null, BlobsSidecarManager.NOOP); this.eventThread = eventThread; this.enforceEventThread = enforceEventThread; } @@ -121,7 +122,9 @@ public BatchSupport( final TargetChain chain, final UInt64 start, final UInt64 count) { - batch = new SyncSourceBatch(eventThread, this, this, chain, start, count); + batch = + new SyncSourceBatch( + eventThread, this, this, chain, BlobsSidecarManager.NOOP, start, count); eventThreadOnlyBatch = new EventThreadOnlyBatch(eventThread, batch); }