Skip to content

Commit

Permalink
resolve merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
eserilev committed Oct 29, 2023
1 parent 229893a commit d2e7790
Showing 1 changed file with 83 additions and 37 deletions.
120 changes: 83 additions & 37 deletions validator_client/src/block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::sleep;
use types::{
AbstractExecPayload, BeaconBlock, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti,
PublicKeyBytes, Slot,
AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes,
Slot,
};

#[derive(Debug)]
Expand Down Expand Up @@ -335,10 +335,20 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
)
}

// TODO: activate automatically at Deneb
if !self.validator_store.produce_block_v3() {
let deneb_fork_activated = self
.context
.eth2_config
.spec
.deneb_fork_epoch
.and_then(|fork_epoch| {
let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch());
Some(current_epoch >= fork_epoch)
})
.unwrap_or(false);

if !self.validator_store.produce_block_v3() || deneb_fork_activated {
for validator_pubkey in proposers {
let service = self.clone();
let service: BlockService<T, E> = self.clone();
let log = log.clone();
self.inner.context.executor.spawn(
async move {
Expand Down Expand Up @@ -458,8 +468,10 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti: Option<Graffiti>,
validator_pubkey: PublicKeyBytes,
proposer_index: Option<u64>,
block: BeaconBlock<E, Payload>,
block_contents: BlockContents<E, Payload>,
) -> Result<(), BlockError> {
let (block, maybe_blob_sidecars) = block_contents.deconstruct();

info!(
log,
"Received unsigned block";
Expand All @@ -477,11 +489,25 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
"Proposer index does not match block proposer. Beacon chain re-orged".to_string(),
));
}
let validator_pubkey_ref = &validator_pubkey;
let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES);

let signing_time_ms =
Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis();

info!(
log,
"Publishing signed block";
"slot" => slot.as_u64(),
"signing_time_ms" => signing_time_ms,
);
let self_ref = &self;

let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES);
let signed_block = match self

let signed_block = match self_ref
.validator_store
.sign_block::<Payload>(validator_pubkey, block, current_slot)
.sign_block::<Payload>(*validator_pubkey_ref, block, current_slot)
.await
{
Ok(block) => block,
Expand All @@ -505,6 +531,36 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
}
};

let maybe_signed_blobs = match maybe_blob_sidecars {
Some(blob_sidecars) => {
match self_ref
.validator_store
.sign_blobs::<Payload>(*validator_pubkey_ref, blob_sidecars)
.await
{
Ok(signed_blobs) => Some(signed_blobs),
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently removed
// via the API.
warn!(
log,
"Missing pubkey for blobs";
"info" => "a validator may have recently been removed from this VC",
"pubkey" => ?pubkey,
"slot" => ?slot
);
return Ok(());
}
Err(e) => {
return Err(BlockError::Recoverable(format!(
"Unable to sign blobs: {:?}",
e
)))
}
}
}
None => None,
};
let signing_time_ms =
Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis();

Expand All @@ -515,31 +571,23 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
"signing_time_ms" => signing_time_ms,
);

let signed_block_contents = SignedBlockContents::from((signed_block, maybe_signed_blobs));

// Publish block with first available beacon node.
//
// Try the proposer nodes first, since we've likely gone to efforts to
// protect them from DoS attacks and they're most likely to successfully
// publish a block.
proposer_fallback
.first_success_try_proposers_first(
RequireSynced::No,
OfflineOnFailure::Yes,
|beacon_node| async {
let _post_timer = metrics::start_timer_vec(
&metrics::BLOCK_SERVICE_TIMES,
&[metrics::BEACON_BLOCK_HTTP_POST],
);
match Payload::block_type() {
BlockType::Blinded => {
beacon_node
.post_beacon_blinded_blocks(&signed_block)
.await
.or_else(|e| handle_block_post_error(e, slot, log))?;
Ok::<_, BlockError>(())
}
BlockType::Full => {
beacon_node
.post_beacon_blocks(&signed_block)
.await
.or_else(|e| handle_block_post_error(e, slot, log))?;
Ok::<_, BlockError>(())
}
}
self.publish_signed_block_contents::<Payload>(
&signed_block_contents,
beacon_node,
)
.await
},
)
.await?;
Expand All @@ -548,10 +596,10 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
log,
"Successfully published block";
"block_type" => ?Payload::block_type(),
"deposits" => signed_block.message().body().deposits().len(),
"attestations" => signed_block.message().body().attestations().len(),
"deposits" => signed_block_contents.signed_block().message().body().deposits().len(),
"attestations" => signed_block_contents.signed_block().message().body().attestations().len(),
"graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()),
"slot" => signed_block.slot().as_u64(),
"slot" => signed_block_contents.signed_block().slot().as_u64(),
);

Ok(())
Expand Down Expand Up @@ -646,8 +694,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
.await??;

match block_response {
eth2::types::ForkVersionedBeaconBlockType::Full(block_response) => {
let block = block_response.data;
eth2::types::ForkVersionedBeaconBlockType::Full(block_contents) => {
self.handle_block_response::<FullPayload<E>>(
log,
proposer_fallback,
Expand All @@ -656,12 +703,11 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti,
validator_pubkey,
proposer_index,
block,
block_contents.data,
)
.await?;
}
eth2::types::ForkVersionedBeaconBlockType::Blinded(block_response) => {
let block: BeaconBlock<E, BlindedPayload<E>> = block_response.data;
eth2::types::ForkVersionedBeaconBlockType::Blinded(block_contents) => {
self.handle_block_response::<BlindedPayload<E>>(
log,
proposer_fallback,
Expand All @@ -670,7 +716,7 @@ impl<T: SlotClock + 'static, E: EthSpec> BlockService<T, E> {
graffiti,
validator_pubkey,
proposer_index,
block,
block_contents.data,
)
.await?;
}
Expand Down

0 comments on commit d2e7790

Please sign in to comment.