Skip to content

Commit

Permalink
[Decoupling] Block and blob sidecars production for Deneb (part 1) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
StefanBratanov authored May 19, 2023
1 parent d44d64f commit 5502f7e
Show file tree
Hide file tree
Showing 41 changed files with 282 additions and 645 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public SafeFuture<BeaconBlock> createUnsignedBlock(

public SafeFuture<SignedBeaconBlock> unblindSignedBeaconBlockIfBlinded(
final SignedBeaconBlock blindedSignedBeaconBlock) {
if (blindedSignedBeaconBlock.getMessage().getBody().isBlinded()) {
if (blindedSignedBeaconBlock.isBlinded()) {
return spec.unblindSignedBeaconBlock(
blindedSignedBeaconBlock, operationSelector.createUnblinderSelector());
}
Expand All @@ -74,7 +74,7 @@ public SafeFuture<SignedBeaconBlock> unblindSignedBeaconBlockIfBlinded(

public SignedBeaconBlock blindSignedBeaconBlockIfUnblinded(
final SignedBeaconBlock unblindedSignedBeaconBlock) {
if (unblindedSignedBeaconBlock.getMessage().getBody().isBlinded()) {
if (unblindedSignedBeaconBlock.isBlinded()) {
return unblindedSignedBeaconBlock;
}
return spec.blindSignedBeaconBlock(unblindedSignedBeaconBlock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.function.BiFunction;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.tuweni.bytes.Bytes32;
Expand All @@ -55,10 +54,9 @@
import tech.pegasys.teku.spec.SpecVersion;
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.BlockContainer;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.blocks.versions.deneb.BlindedBlockContents;
import tech.pegasys.teku.spec.datastructures.blocks.versions.deneb.BlockContents;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.datastructures.builder.SignedValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.builder.ValidatorRegistration;
import tech.pegasys.teku.spec.datastructures.genesis.GenesisData;
Expand Down Expand Up @@ -289,7 +287,7 @@ public SafeFuture<Optional<Map<BLSPublicKey, ValidatorStatus>>> getValidatorStat
}

@Override
public SafeFuture<Optional<BeaconBlock>> createUnsignedBlock(
public SafeFuture<Optional<BlockContainer>> createUnsignedBlock(
final UInt64 slot,
final BLSSignature randaoReveal,
final Optional<Bytes32> graffiti,
Expand All @@ -306,19 +304,7 @@ public SafeFuture<Optional<BeaconBlock>> createUnsignedBlock(
blockSlotState -> createBlock(slot, randaoReveal, graffiti, blinded, blockSlotState));
}

@Override
public SafeFuture<Optional<BlindedBlockContents>> createUnsignedBlindedBlockContents(
final UInt64 slot, final BLSSignature randaoReveal, final Optional<Bytes32> graffiti) {
throw new NotImplementedException("Not Yet Implemented");
}

@Override
public SafeFuture<Optional<BlockContents>> createUnsignedBlockContents(
final UInt64 slot, final BLSSignature randaoReveal, final Optional<Bytes32> graffiti) {
throw new NotImplementedException("Not Yet Implemented");
}

private SafeFuture<Optional<BeaconBlock>> createBlock(
private SafeFuture<Optional<BlockContainer>> createBlock(
final UInt64 slot,
final BLSSignature randaoReveal,
final Optional<Bytes32> graffiti,
Expand Down Expand Up @@ -572,9 +558,9 @@ private SafeFuture<InternalValidationResult> processAggregateAndProof(

@Override
public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final SignedBeaconBlock maybeBlindedBlock) {
final SignedBlockContainer maybeBlindedBlockContainer) {
return blockPublisher
.sendSignedBlock(maybeBlindedBlock)
.sendSignedBlock(maybeBlindedBlockContainer)
.exceptionally(ex -> SendSignedBlockResult.rejected(ex.getMessage()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.logging.log4j.Logger;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
Expand Down Expand Up @@ -46,8 +47,11 @@ public AbstractBlockPublisher(
this.dutyMetrics = dutyMetrics;
}

// TODO: blinding and unblinding of the SignedBlockContainer
@Override
public SafeFuture<SendSignedBlockResult> sendSignedBlock(SignedBeaconBlock maybeBlindedBlock) {
public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final SignedBlockContainer maybeBlindedBlockContainer) {
final SignedBeaconBlock maybeBlindedBlock = maybeBlindedBlockContainer.getSignedBlock();
return blockFactory
.unblindSignedBeaconBlockIfBlinded(maybeBlindedBlock)
.thenPeek(performanceTracker::saveProducedBlock)
Expand Down Expand Up @@ -78,5 +82,5 @@ public SafeFuture<SendSignedBlockResult> sendSignedBlock(SignedBeaconBlock maybe
}

abstract SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
final SignedBeaconBlock block);
final SignedBlockContainer blockContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
package tech.pegasys.teku.validator.coordinator.publisher;

import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;

public interface BlockPublisher {
SafeFuture<SendSignedBlockResult> sendSignedBlock(SignedBeaconBlock maybeBlindedBlock);
SafeFuture<SendSignedBlockResult> sendSignedBlock(
SignedBlockContainer maybeBlindedBlockContainer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import tech.pegasys.teku.networking.eth2.gossip.BlobSidecarGossipChannel;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
import tech.pegasys.teku.validator.coordinator.DutyMetrics;
import tech.pegasys.teku.validator.coordinator.performance.PerformanceTracker;

@SuppressWarnings("unused")
public class BlockPublisherDeneb extends AbstractBlockPublisher {

private final BlobSidecarPool blobSidecarPool;
Expand All @@ -47,7 +47,21 @@ public BlockPublisherDeneb(

@Override
protected SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
final SignedBeaconBlock block) {
throw new UnsupportedOperationException("Not yet implemented");
final SignedBlockContainer blockContainer) {
gossipAndImportBlobSidecars(blockContainer);
final SignedBeaconBlock block = blockContainer.getSignedBlock();
blockGossipChannel.publishBlock(block);
return blockImportChannel.importBlock(block);
}

private void gossipAndImportBlobSidecars(final SignedBlockContainer blockContainer) {
blockContainer
.getSignedBlobSidecars()
.ifPresent(
signedBlobSidecars -> {
blobSidecarGossipChannel.publishBlobSidecars(signedBlobSidecars);
blobSidecarPool.onCompletedBlockAndSignedBlobSidecars(
blockContainer.getSignedBlock(), signedBlobSidecars);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.validator.coordinator.BlockFactory;
Expand All @@ -37,7 +38,8 @@ public BlockPublisherPhase0(

@Override
protected SafeFuture<BlockImportResult> gossipAndImportUnblindedSignedBlock(
final SignedBeaconBlock block) {
final SignedBlockContainer blockContainer) {
final SignedBeaconBlock block = blockContainer.getSignedBlock();
blockGossipChannel.publishBlock(block);
return blockImportChannel.importBlock(block);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.SpecMilestone;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.statetransition.blobs.BlobSidecarPool;
import tech.pegasys.teku.statetransition.block.BlockImportChannel;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;
Expand Down Expand Up @@ -77,8 +77,9 @@ public MilestoneBasedBlockPublisher(

@Override
public SafeFuture<SendSignedBlockResult> sendSignedBlock(
final SignedBeaconBlock maybeBlindedBlock) {
final SpecMilestone blockMilestone = spec.atSlot(maybeBlindedBlock.getSlot()).getMilestone();
return registeredPublishers.get(blockMilestone).sendSignedBlock(maybeBlindedBlock);
final SignedBlockContainer maybeBlindedBlockContainer) {
final SpecMilestone blockMilestone =
spec.atSlot(maybeBlindedBlockContainer.getSlot()).getMilestone();
return registeredPublishers.get(blockMilestone).sendSignedBlock(maybeBlindedBlockContainer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation;
import tech.pegasys.teku.spec.datastructures.blobs.versions.deneb.SignedBlobSidecar;
import tech.pegasys.teku.spec.datastructures.blocks.BeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.BlockContainer;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState;
import tech.pegasys.teku.spec.datastructures.builder.SignedValidatorRegistration;
Expand Down Expand Up @@ -458,7 +459,7 @@ void getSyncCommitteeDuties_shouldNotUseEpochPriorToFork() {
@Test
public void createUnsignedBlock_shouldFailWhenNodeIsSyncing() {
nodeIsSyncing();
final SafeFuture<Optional<BeaconBlock>> result =
final SafeFuture<Optional<BlockContainer>> result =
validatorApiHandler.createUnsignedBlock(
ONE, dataStructureUtil.randomSignature(), Optional.empty(), false);

Expand All @@ -475,7 +476,7 @@ public void createUnsignedBlock_shouldFailWhenParentBlockIsOptimistic() {
final Bytes32 parentRoot = spec.getBlockRootAtSlot(blockSlotState, newSlot.minus(1));
when(chainDataClient.isOptimisticBlock(parentRoot)).thenReturn(true);

final SafeFuture<Optional<BeaconBlock>> result =
final SafeFuture<Optional<BlockContainer>> result =
validatorApiHandler.createUnsignedBlock(
newSlot, dataStructureUtil.randomSignature(), Optional.empty(), false);

Expand All @@ -497,7 +498,7 @@ public void createUnsignedBlock_shouldCreateBlock() {
blockSlotState, newSlot, randaoReveal, Optional.empty(), false))
.thenReturn(SafeFuture.completedFuture(createdBlock));

final SafeFuture<Optional<BeaconBlock>> result =
final SafeFuture<Optional<BlockContainer>> result =
validatorApiHandler.createUnsignedBlock(newSlot, randaoReveal, Optional.empty(), false);

verify(blockFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"required" : [ "blinded_block", "blinded_blob_sidecars" ],
"properties" : {
"blinded_block" : {
"$ref" : "#/components/schemas/BeaconBlockDeneb"
"$ref" : "#/components/schemas/BlindedBlockDeneb"
},
"blinded_blob_sidecars" : {
"type" : "array",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,15 @@
import tech.pegasys.teku.api.DataProvider;
import tech.pegasys.teku.api.SyncDataProvider;
import tech.pegasys.teku.api.ValidatorDataProvider;
import tech.pegasys.teku.infrastructure.async.SafeFuture;
import tech.pegasys.teku.infrastructure.restapi.endpoints.AsyncApiResponse;
import tech.pegasys.teku.infrastructure.restapi.endpoints.EndpointMetadata;
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiEndpoint;
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiRequest;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.BlockContainer;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionCache;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
import tech.pegasys.teku.validator.api.SendSignedBlockResult;

public class PostBlindedBlock extends RestApiEndpoint {
public static final String ROUTE = "/eth/v1/beacon/blinded_blocks";
Expand Down Expand Up @@ -73,25 +71,26 @@ public void handleRequest(final RestApiRequest request) throws JsonProcessingExc
return;
}

BlockContainer requestBody = request.getRequestBody();
final SafeFuture<SendSignedBlockResult> result =
validatorDataProvider.submitSignedBlindedBlock(requestBody);
final SignedBlockContainer requestBody = request.getRequestBody();

request.respondAsync(
result.thenApply(
blockResult -> {
if (blockResult.getRejectionReason().isEmpty()) {
return AsyncApiResponse.respondWithCode(SC_OK);
} else if (blockResult
.getRejectionReason()
.get()
.equals(BlockImportResult.FailureReason.INTERNAL_ERROR.name())) {
return AsyncApiResponse.respondWithError(
SC_INTERNAL_SERVER_ERROR,
"An internal error occurred, check the server logs for more details.");
} else {
return AsyncApiResponse.respondWithCode(SC_ACCEPTED);
}
}));
validatorDataProvider
.submitSignedBlindedBlock(requestBody)
.thenApply(
blockResult -> {
if (blockResult.getRejectionReason().isEmpty()) {
return AsyncApiResponse.respondWithCode(SC_OK);
} else if (blockResult
.getRejectionReason()
.get()
.equals(BlockImportResult.FailureReason.INTERNAL_ERROR.name())) {
return AsyncApiResponse.respondWithError(
SC_INTERNAL_SERVER_ERROR,
"An internal error occurred, check the server logs for more details.");
} else {
return AsyncApiResponse.respondWithCode(SC_ACCEPTED);
}
}));
}

private static EndpointMetadata getEndpointMetaData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiEndpoint;
import tech.pegasys.teku.infrastructure.restapi.endpoints.RestApiRequest;
import tech.pegasys.teku.spec.Spec;
import tech.pegasys.teku.spec.datastructures.blocks.BlockContainer;
import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockContainer;
import tech.pegasys.teku.spec.logic.common.statetransition.results.BlockImportResult.FailureReason;
import tech.pegasys.teku.spec.schemas.SchemaDefinitionCache;
import tech.pegasys.teku.spec.schemas.SchemaDefinitions;
Expand Down Expand Up @@ -101,11 +101,11 @@ public void handleRequest(final RestApiRequest request) throws JsonProcessingExc
return;
}

final BlockContainer blockContainer = request.getRequestBody();
final SignedBlockContainer requestBody = request.getRequestBody();

request.respondAsync(
validatorDataProvider
.submitSignedBlock(blockContainer)
.submitSignedBlock(requestBody)
.thenApply(
result -> {
if (result.getRejectionReason().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,21 +114,9 @@ private static EndpointMetadata getEndpointMetaData(

@NotNull
private static Function<SszData, SpecMilestone> getMilestoneSelector(final Spec spec) {
return sszData -> {
if (sszData instanceof BeaconBlock) {
return spec.getForkSchedule().getSpecMilestoneAtSlot(((BeaconBlock) sszData).getSlot());
} else if (sszData instanceof BlindedBlockContents) {
return spec.getForkSchedule()
.getSpecMilestoneAtSlot(((BlindedBlockContents) sszData).getBlock().getSlot());
} else {
throw new UnsupportedOperationException(
String.format(
"Unsupported GetNewBlindedBlock response type. Must be of type %s or %s but got %s",
BeaconBlock.class.getCanonicalName(),
BlindedBlockContents.class.getCanonicalName(),
sszData.getClass().getCanonicalName()));
}
};
return sszData ->
spec.getForkSchedule()
.getSpecMilestoneAtSlot(BlockContainer.fromSszData(sszData).getSlot());
}

private static SerializableOneOfTypeDefinition<BlockContainer> getResponseTypes(
Expand Down Expand Up @@ -180,14 +168,14 @@ private static SerializableTypeDefinition<BeaconBlock> getBeaconBlockResponseTyp
.map(SchemaDefinitionsDeneb::getBlindedBlockContentsSchema),
(blindedBlockContents, milestone) ->
schemaDefinitionCache
.milestoneAtSlot(blindedBlockContents.getBlock().getSlot())
.milestoneAtSlot(blindedBlockContents.getSlot())
.equals(milestone)),
Function.identity())
.withField(
"version",
MILESTONE_TYPE,
blindedBlockContents ->
schemaDefinitionCache.milestoneAtSlot(blindedBlockContents.getBlock().getSlot()))
schemaDefinitionCache.milestoneAtSlot(blindedBlockContents.getSlot()))
.build();
}
}
Loading

0 comments on commit 5502f7e

Please sign in to comment.