From d2e77901735c23732f81daf5d7cfe94c2f562fdb Mon Sep 17 00:00:00 2001 From: Eitan Seri-Levi Date: Sun, 29 Oct 2023 16:44:27 +0200 Subject: [PATCH] resolve merge conflicts --- validator_client/src/block_service.rs | 120 ++++++++++++++++++-------- 1 file changed, 83 insertions(+), 37 deletions(-) diff --git a/validator_client/src/block_service.rs b/validator_client/src/block_service.rs index 094d6729fff..9d9e0e72776 100644 --- a/validator_client/src/block_service.rs +++ b/validator_client/src/block_service.rs @@ -23,8 +23,8 @@ use std::time::Duration; use tokio::sync::mpsc; use tokio::time::sleep; use types::{ - AbstractExecPayload, BeaconBlock, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, - PublicKeyBytes, Slot, + AbstractExecPayload, BlindedPayload, BlockType, EthSpec, FullPayload, Graffiti, PublicKeyBytes, + Slot, }; #[derive(Debug)] @@ -335,10 +335,20 @@ impl BlockService { ) } - // TODO: activate automatically at Deneb - if !self.validator_store.produce_block_v3() { + let deneb_fork_activated = self + .context + .eth2_config + .spec + .deneb_fork_epoch + .and_then(|fork_epoch| { + let current_epoch = self.slot_clock.now()?.epoch(E::slots_per_epoch()); + Some(current_epoch >= fork_epoch) + }) + .unwrap_or(false); + + if !self.validator_store.produce_block_v3() || deneb_fork_activated { for validator_pubkey in proposers { - let service = self.clone(); + let service: BlockService = self.clone(); let log = log.clone(); self.inner.context.executor.spawn( async move { @@ -458,8 +468,10 @@ impl BlockService { graffiti: Option, validator_pubkey: PublicKeyBytes, proposer_index: Option, - block: BeaconBlock, + block_contents: BlockContents, ) -> Result<(), BlockError> { + let (block, maybe_blob_sidecars) = block_contents.deconstruct(); + info!( log, "Received unsigned block"; @@ -477,11 +489,25 @@ impl BlockService { "Proposer index does not match block proposer. Beacon chain re-orged".to_string(), )); } + let validator_pubkey_ref = &validator_pubkey; + let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); + + let signing_time_ms = + Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); + + info!( + log, + "Publishing signed block"; + "slot" => slot.as_u64(), + "signing_time_ms" => signing_time_ms, + ); + let self_ref = &self; let signing_timer = metrics::start_timer(&metrics::BLOCK_SIGNING_TIMES); - let signed_block = match self + + let signed_block = match self_ref .validator_store - .sign_block::(validator_pubkey, block, current_slot) + .sign_block::(*validator_pubkey_ref, block, current_slot) .await { Ok(block) => block, @@ -505,6 +531,36 @@ impl BlockService { } }; + let maybe_signed_blobs = match maybe_blob_sidecars { + Some(blob_sidecars) => { + match self_ref + .validator_store + .sign_blobs::(*validator_pubkey_ref, blob_sidecars) + .await + { + Ok(signed_blobs) => Some(signed_blobs), + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently removed + // via the API. + warn!( + log, + "Missing pubkey for blobs"; + "info" => "a validator may have recently been removed from this VC", + "pubkey" => ?pubkey, + "slot" => ?slot + ); + return Ok(()); + } + Err(e) => { + return Err(BlockError::Recoverable(format!( + "Unable to sign blobs: {:?}", + e + ))) + } + } + } + None => None, + }; let signing_time_ms = Duration::from_secs_f64(signing_timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); @@ -515,31 +571,23 @@ impl BlockService { "signing_time_ms" => signing_time_ms, ); + let signed_block_contents = SignedBlockContents::from((signed_block, maybe_signed_blobs)); + + // Publish block with first available beacon node. + // + // Try the proposer nodes first, since we've likely gone to efforts to + // protect them from DoS attacks and they're most likely to successfully + // publish a block. proposer_fallback .first_success_try_proposers_first( RequireSynced::No, OfflineOnFailure::Yes, |beacon_node| async { - let _post_timer = metrics::start_timer_vec( - &metrics::BLOCK_SERVICE_TIMES, - &[metrics::BEACON_BLOCK_HTTP_POST], - ); - match Payload::block_type() { - BlockType::Blinded => { - beacon_node - .post_beacon_blinded_blocks(&signed_block) - .await - .or_else(|e| handle_block_post_error(e, slot, log))?; - Ok::<_, BlockError>(()) - } - BlockType::Full => { - beacon_node - .post_beacon_blocks(&signed_block) - .await - .or_else(|e| handle_block_post_error(e, slot, log))?; - Ok::<_, BlockError>(()) - } - } + self.publish_signed_block_contents::( + &signed_block_contents, + beacon_node, + ) + .await }, ) .await?; @@ -548,10 +596,10 @@ impl BlockService { log, "Successfully published block"; "block_type" => ?Payload::block_type(), - "deposits" => signed_block.message().body().deposits().len(), - "attestations" => signed_block.message().body().attestations().len(), + "deposits" => signed_block_contents.signed_block().message().body().deposits().len(), + "attestations" => signed_block_contents.signed_block().message().body().attestations().len(), "graffiti" => ?graffiti.map(|g| g.as_utf8_lossy()), - "slot" => signed_block.slot().as_u64(), + "slot" => signed_block_contents.signed_block().slot().as_u64(), ); Ok(()) @@ -646,8 +694,7 @@ impl BlockService { .await??; match block_response { - eth2::types::ForkVersionedBeaconBlockType::Full(block_response) => { - let block = block_response.data; + eth2::types::ForkVersionedBeaconBlockType::Full(block_contents) => { self.handle_block_response::>( log, proposer_fallback, @@ -656,12 +703,11 @@ impl BlockService { graffiti, validator_pubkey, proposer_index, - block, + block_contents.data, ) .await?; } - eth2::types::ForkVersionedBeaconBlockType::Blinded(block_response) => { - let block: BeaconBlock> = block_response.data; + eth2::types::ForkVersionedBeaconBlockType::Blinded(block_contents) => { self.handle_block_response::>( log, proposer_fallback, @@ -670,7 +716,7 @@ impl BlockService { graffiti, validator_pubkey, proposer_index, - block, + block_contents.data, ) .await?; }