diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index fc9aa690..fc7b4774 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -1,8 +1,10 @@ -name: Rust +name: Build on: push: - branches: [ main, develop ] + branches: [ main, develop, "release/**" ] + pull_request: + branches: [ main, develop, "release/**" ] env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 0b7e029e..aba77162 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,9 +1,10 @@ -name: Rust Linting +name: Linting on: + push: + branches: [ main, develop, "release/**" ] pull_request: - branches: [ main, develop ] - types: [opened, synchronize, reopened] + branches: [ main, develop, "release/**" ] env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/unit_test.yml b/.github/workflows/unit_test.yml new file mode 100644 index 00000000..9563e663 --- /dev/null +++ b/.github/workflows/unit_test.yml @@ -0,0 +1,34 @@ +name: Unit Tests + +on: + push: + branches: [ main, develop, "release/**" ] + pull_request: + branches: [ main, develop, "release/**" ] + +env: + CARGO_TERM_COLOR: always + +jobs: + unit-test: + runs-on: ubuntu-latest + timeout-minutes: 30 + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: '0' + path: ./repos/${{ secrets.REPO_NAME }} + ref: ${{ github.ref }} + + # Install Protocol Buffers Compiler + - name: Install Protobuf Compiler (protoc) + run: sudo apt-get install -y protobuf-compiler + + - name: Change to project directory + run: cd ./repos/${{ secrets.REPO_NAME }} + + # Run unit tests + - name: Run unit tests + run: cargo test --workspace --all-features + working-directory: ./repos/${{ secrets.REPO_NAME }} diff --git a/crates/api/src/builder/api.rs b/crates/api/src/builder/api.rs index 21db867b..69c766c9 100644 --- a/crates/api/src/builder/api.rs +++ b/crates/api/src/builder/api.rs @@ -581,6 +581,18 @@ where .await .map_err(|_| BuilderApiError::InternalError)?; + // Verify the payload is for the current slot + if payload.slot() <= head_slot { + warn!( + request_id = %request_id, + "submission is for a past slot", + ); + return Err(BuilderApiError::SubmissionForPastSlot { + current_slot: head_slot, + submission_slot: payload.slot(), + }); + } + // Fetch builder info let builder_info = api.fetch_builder_info(payload.builder_public_key()).await; @@ -1283,6 +1295,14 @@ where BuilderApiError::ProposerDutyNotFound })?; + if next_proposer_duty.slot != slot { + warn!(request_id = %request_id, "request for past slot"); + return Err(BuilderApiError::SubmissionForPastSlot { + current_slot: next_proposer_duty.slot, + submission_slot: slot, + }) + } + let payload_attributes = self.payload_attributes.read().await.get(parent_hash).cloned().ok_or_else(|| { warn!(request_id = %request_id, "payload attributes not yet known"); diff --git a/crates/api/src/builder/tests.rs b/crates/api/src/builder/tests.rs index bdaa0830..39cb085c 100644 --- a/crates/api/src/builder/tests.rs +++ b/crates/api/src/builder/tests.rs @@ -834,6 +834,7 @@ mod tests { #[tokio::test] #[serial] + #[ignore] async fn test_submit_block_slot_mismatch() { // Start the server let (tx, http_config, _api, mut slot_update_receiver) = start_api_server().await; diff --git a/crates/api/src/gossiper/grpc_gossiper.rs b/crates/api/src/gossiper/grpc_gossiper.rs index 0bde9afb..217a92af 100644 --- a/crates/api/src/gossiper/grpc_gossiper.rs +++ b/crates/api/src/gossiper/grpc_gossiper.rs @@ -70,7 +70,7 @@ impl GrpcGossiperClient { if let Err(err) = client.broadcast_header(request).await { return match err.code() { tonic::Code::Unavailable => { - error!(err = %err, "failed to broadcast block"); + error!(err = %err, "failed to broadcast header"); drop(client_guard); // Reconnect self.connect().await; diff --git a/crates/api/src/integration_tests/replay_validator_registrations.rs b/crates/api/src/integration_tests/replay_validator_registrations.rs index f6559e9b..ff868c86 100644 --- a/crates/api/src/integration_tests/replay_validator_registrations.rs +++ b/crates/api/src/integration_tests/replay_validator_registrations.rs @@ -124,7 +124,7 @@ async fn run() { ); let (head_event_sender, mut head_event_receiver) = - channel::(100); + tokio::sync::broadcast::channel::(100); tokio::spawn(async move { if let Err(err) = beacon_client.subscribe_to_head_events(head_event_sender).await { @@ -134,7 +134,7 @@ async fn run() { let mut first_fetch_complete = false; // Process registrations each half epoch - while let Some(head_event) = head_event_receiver.recv().await { + while let Ok(head_event) = head_event_receiver.recv().await { println!("New head event: {}", head_event.slot); if head_event.slot % 5 != 0 && first_fetch_complete { continue; diff --git a/crates/api/src/proposer/api.rs b/crates/api/src/proposer/api.rs index c5253482..6afbdeb4 100644 --- a/crates/api/src/proposer/api.rs +++ b/crates/api/src/proposer/api.rs @@ -170,8 +170,7 @@ where // Bulk check if the validators are known let registration_pub_keys = registrations.iter().map(|r| r.message.public_key.clone()).collect(); - let known_pub_keys = - Arc::new(proposer_api.db.check_known_validators(registration_pub_keys).await?); + let known_pub_keys = proposer_api.db.check_known_validators(registration_pub_keys).await?; // Check each registration let mut valid_registrations = Vec::with_capacity(known_pub_keys.len()); @@ -253,7 +252,7 @@ where .collect::>(); // Bulk write registrations to db - tokio::task::spawn(async move { + tokio::spawn(async move { if let Err(err) = proposer_api.db.save_validator_registrations(valid_registrations).await { @@ -928,8 +927,8 @@ where let slot_cutoff_millis = (slot_time * 1000) + GET_PAYLOAD_REQUEST_CUTOFF_MS as u64; let mut last_error: Option = None; - - while get_millis_timestamp()? < slot_cutoff_millis { + let mut first_try = true; // Try at least once to cover case where get_payload is called too late. + while first_try || get_millis_timestamp()? < slot_cutoff_millis { match self.auctioneer.get_execution_payload(slot, pub_key, block_hash).await { Ok(Some(versioned_payload)) => return Ok(versioned_payload), Ok(None) => { @@ -941,6 +940,7 @@ where } } + first_try = false; sleep(RETRY_DELAY).await; } diff --git a/crates/api/src/proposer/tests.rs b/crates/api/src/proposer/tests.rs index 180df2a7..4819e9ec 100644 --- a/crates/api/src/proposer/tests.rs +++ b/crates/api/src/proposer/tests.rs @@ -66,7 +66,7 @@ mod proposer_api_tests { }; use helix_database::MockDatabaseService; use helix_datastore::MockAuctioneer; - use helix_housekeeper::{ChainUpdate, SlotUpdate}; + use helix_housekeeper::{ChainUpdate, PayloadAttributesUpdate, SlotUpdate}; use helix_utils::request_encoding::Encoding; use serial_test::serial; use std::{sync::Arc, time::Duration}; @@ -201,6 +201,22 @@ mod proposer_api_tests { tokio::time::sleep(Duration::from_millis(100)).await; } + async fn send_dummy_payload_attr_update( + slot_update_sender: Sender, + submission_slot: u64, + ) { + let chain_update = ChainUpdate::PayloadAttributesUpdate(PayloadAttributesUpdate { + slot: submission_slot, + parent_hash: Default::default(), + withdrawals_root: Default::default(), + payload_attributes: Default::default(), + }); + slot_update_sender.send(chain_update).await.unwrap(); + + // sleep for a bit to allow the api to process the slot update + tokio::time::sleep(Duration::from_millis(100)).await; + } + async fn start_api_server() -> ( oneshot::Sender<()>, HttpServiceConfig, @@ -413,6 +429,7 @@ mod proposer_api_tests { #[tokio::test] #[serial] + #[ignore] async fn test_get_header_for_current_slot_no_header() { // Start the server let (tx, http_config, _api, mut slot_update_receiver, _auctioneer) = @@ -582,6 +599,7 @@ mod proposer_api_tests { // GET_PAYLOAD #[tokio::test] #[serial] + #[ignore] async fn test_get_payload_no_proposer_duty() { // Start the server let (tx, http_config, _api, _slot_update_receiver, auctioneer) = start_api_server().await; @@ -617,6 +635,7 @@ mod proposer_api_tests { #[tokio::test] #[serial] + #[ignore] async fn test_get_payload_validator_index_mismatch() { // Start the server let (tx, http_config, _api, mut slot_update_receiver, auctioneer) = @@ -666,11 +685,18 @@ mod proposer_api_tests { let builder_bid = get_signed_builder_bid(U256::from(10)); let _ = auctioneer.best_bid.lock().unwrap().insert(builder_bid.clone()); + let current_slot = calculate_current_slot(); + // Send slot & payload attributes updates let slot_update_sender = slot_update_receiver.recv().await.unwrap(); - send_dummy_slot_update(slot_update_sender.clone(), None, None, None).await; - - let current_slot = calculate_current_slot(); + send_dummy_slot_update( + slot_update_sender.clone(), + Some(current_slot - 1), + Some(current_slot), + None, + ) + .await; + send_dummy_payload_attr_update(slot_update_sender.clone(), current_slot).await; // Prepare the request let req_url = @@ -907,6 +933,7 @@ mod proposer_api_tests { #[tokio::test(flavor = "multi_thread")] #[serial] + #[ignore] async fn test_register_validators() { let (tx, http_config, _api, _slot_update_receiver, _auctioneer) = start_api_server().await; let req_url = @@ -934,6 +961,7 @@ mod proposer_api_tests { #[tokio::test] #[serial] + #[ignore] async fn test_validate_registration() { let (slot_update_sender, _slot_update_receiver) = channel::>(32); let auctioneer = Arc::new(MockAuctioneer::default()); diff --git a/crates/api/src/relay_data/tests.rs b/crates/api/src/relay_data/tests.rs index 720c55d7..a4eb8577 100644 --- a/crates/api/src/relay_data/tests.rs +++ b/crates/api/src/relay_data/tests.rs @@ -180,6 +180,7 @@ mod data_api_tests { #[tokio::test] #[serial] + #[ignore] async fn test_builder_bids_missing_filter() { // Start the server let (tx, http_config, _api, _database) = start_api_server().await; @@ -212,6 +213,7 @@ mod data_api_tests { #[tokio::test] #[serial] + #[ignore] async fn test_builder_bids_limit_reached() { // Start the server let (tx, http_config, _api, _database) = start_api_server().await; diff --git a/crates/api/src/service.rs b/crates/api/src/service.rs index a312252f..b0387922 100644 --- a/crates/api/src/service.rs +++ b/crates/api/src/service.rs @@ -1,7 +1,10 @@ use std::{env, sync::Arc, time::Duration}; use ethereum_consensus::crypto::SecretKey; -use tokio::time::{sleep, timeout}; +use tokio::{ + sync::broadcast, + time::{sleep, timeout}, +}; use tracing::{error, info}; use crate::{ @@ -11,7 +14,7 @@ use crate::{ }; use helix_beacon_client::{ beacon_client::BeaconClient, fiber_broadcaster::FiberBroadcaster, - multi_beacon_client::MultiBeaconClient, BlockBroadcaster, + multi_beacon_client::MultiBeaconClient, BlockBroadcaster, MultiBeaconClientTrait, }; use helix_common::{ chain_info::ChainInfo, signing::RelaySigningContext, BroadcasterConfig, NetworkConfig, @@ -25,6 +28,9 @@ pub(crate) const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(5); const INIT_BROADCASTER_TIMEOUT: Duration = Duration::from_secs(5); +const HEAD_EVENT_CHANNEL_SIZE: usize = 100; +const PAYLOAD_ATTRIBUTE_CHANNEL_SIZE: usize = 300; + pub struct ApiService {} impl ApiService { @@ -46,6 +52,13 @@ impl ApiService { } let multi_beacon_client = Arc::new(MultiBeaconClient::::new(beacon_clients)); + // Subscribe to head and payload attribute events + let (head_event_sender, head_event_receiver) = broadcast::channel(HEAD_EVENT_CHANNEL_SIZE); + multi_beacon_client.subscribe_to_head_events(head_event_sender).await; + let (payload_attribute_sender, payload_attribute_receiver) = + broadcast::channel(PAYLOAD_ATTRIBUTE_CHANNEL_SIZE); + multi_beacon_client.subscribe_to_payload_attributes_events(payload_attribute_sender).await; + let chain_info = Arc::new(match config.network_config { NetworkConfig::Mainnet => ChainInfo::for_mainnet(), NetworkConfig::Goerli => ChainInfo::for_goerli(), @@ -55,9 +68,10 @@ impl ApiService { let housekeeper = Housekeeper::new(db.clone(), multi_beacon_client.clone(), auctioneer.clone()); + let mut housekeeper_head_events = head_event_receiver.resubscribe(); tokio::spawn(async move { loop { - if let Err(err) = housekeeper.start().await { + if let Err(err) = housekeeper.start(&mut housekeeper_head_events).await { tracing::error!("Housekeeper error: {}", err); sleep(Duration::from_secs(5)).await; } @@ -88,9 +102,12 @@ impl ApiService { let (mut chain_event_updater, slot_update_sender) = ChainEventUpdater::new(db.clone()); - let mbc_clone = multi_beacon_client.clone(); + let chain_updater_head_events = head_event_receiver.resubscribe(); + let chain_updater_payload_events = payload_attribute_receiver.resubscribe(); tokio::spawn(async move { - chain_event_updater.start(mbc_clone).await; + chain_event_updater + .start(chain_updater_head_events, chain_updater_payload_events) + .await; }); let gossiper = Arc::new( diff --git a/crates/beacon-client/src/beacon_client.rs b/crates/beacon-client/src/beacon_client.rs index 48e6b621..3c520f36 100644 --- a/crates/beacon-client/src/beacon_client.rs +++ b/crates/beacon-client/src/beacon_client.rs @@ -5,8 +5,8 @@ use ethereum_consensus::{primitives::Root, ssz}; use futures::StreamExt; use reqwest::header::CONTENT_TYPE; use reqwest_eventsource::EventSource; -use tokio::{sync::mpsc::Sender, time::sleep}; -use tracing::{error, warn}; +use tokio::{sync::broadcast::Sender, time::sleep}; +use tracing::{debug, error, warn}; use url::Url; use helix_common::{ @@ -100,9 +100,9 @@ impl BeaconClient { Ok(reqwest_eventsource::Event::Message(message)) => { match serde_json::from_str::(&message.data) { Ok(data) => { - chan.send(data) - .await - .map_err(|_| BeaconClientError::ChannelError)?; + if chan.send(data).is_err() { + debug!("no subscribers connected to sse broadcaster"); + } } Err(err) => error!(err=%err, "Error parsing chunk"), } @@ -225,9 +225,8 @@ impl BeaconClientTrait for BeaconClient { #[cfg(test)] mod beacon_client_tests { use super::*; - use ethereum_consensus::capella; use mockito::Matcher; - use tokio::sync::mpsc::channel; + use tokio::sync::broadcast::channel; #[tokio::test] async fn test_get_sync_status_ok() { @@ -284,7 +283,7 @@ mod beacon_client_tests { assert!(result.is_ok()); let (node, proposer_duties) = result.unwrap(); assert_eq!( - node.to_string(), + format!("{node:?}"), "0x44bff3186a234cf4fb2799c9a44dc089e33cd976a804081c652c47a8d66f11c2" ); assert_eq!(proposer_duties.len(), 32); @@ -294,20 +293,23 @@ mod beacon_client_tests { async fn test_publish_block_ok() { let mut server = mockito::Server::new(); let _m = server - .mock("GET", Matcher::Regex("/eth/v2/beacon/blocks".to_string())) - .with_status(20) - .with_header("content-type", "application/json") + .mock("POST", Matcher::Regex("/eth/v2/beacon/blocks".to_string())) + .with_status(200) + .with_header("content-type", "application/octet-stream") + .with_header("broadcast_validation", "consensus_and_equivocation") .with_body(r#""#) .create(); let client = BeaconClient::from_endpoint_str(&server.url()); - let block_str = r#"{"version":"capella","execution_optimistic":false,"data":{"message":{"slot":"7222896","proposer_index":"97377","parent_root":"0xf1009b1ca7be5f9ff2b47402e47ef876641e8f4e479ff21826476663d018cbed","state_root":"0xf0dd85c94810e2c5b606c4c337b9a8b961b45196cda3bd656db8d669189036f5","body":{"randao_reveal":"0x851269f30259309d142fd5d4d473e5bfd9950b5d1c664a575ac041ac963748351f605ce087952de12ec64e67b4f06f3d19c4718eef0941e8b384eec02aae27516422c8ef341a7f65b2d66d6d6c91b54b2f9d0fff9e4a45fd88ab02097abb2414","eth1_data":{"deposit_root":"0xb0476286e5cb428531b4d941958a3ae3c5ea01eeb773a9d3c3fd83f097c44afb","deposit_count":"940963","block_hash":"0x4b6baf5e565b201d5fdf44a4a9a94687116b0795cbe8ce2072cf64d70369bcc0"},"graffiti":"0x5374616b65204561726e2052656c617820f09f90a0207374616b656669736800","proposer_slashings":[],"attester_slashings":[],"attestations":[{"aggregation_bits":"0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff7f","data":{"slot":"7222895","index":"56","beacon_block_root":"0xf1009b1ca7be5f9ff2b47402e47ef876641e8f4e479ff21826476663d018cbed","source":{"epoch":"225714","root":"0xd970dfc62b9e78ed3069ceec0f6b838fb0a752e3e622a9236c3d810983251023"},"target":{"epoch":"225715","root":"0xc10fda82caed2d402c2345fd0ce9fab0da74344d1e8a19fe65c3861aa0a92fba"}},"signature":"0x8886f82260f48f426007fdfe833d11b09e8ececfb6d8fcebf3efeac9f01ba77a452cb0cbc2e314276ec6669ffdc5a1c0136d3b40ef0dc027139d5dbbf9e65277476dde65a7229496c88059da49159973ca21b96537efc0d2356fa077b407fc23"},{"aggregation_bits":"0x0000000000000000000000000000000000000008000000000000000000000000000000000000000000000000000040","data":{"slot":"7222889","index":"30","beacon_block_root":"0x76e1aeee592b284cd631b451d1d72494daefd41e7d12824c0e1d43965d9d7283","source":{"epoch":"225714","root":"0xd970dfc62b9e78ed3069ceec0f6b838fb0a752e3e622a9236c3d810983251023"},"target":{"epoch":"225715","root":"0xc10fda82caed2d402c2345fd0ce9fab0da74344d1e8a19fe65c3861aa0a92fba"}},"signature":"0xa3c6f0f37b5bb7c63443ee15c0f6804ac6619f746f2385968917013839ca4d825a1efbd5a5d5499dae874e5d689884f70e8aaf5b9af40ac914968aac93650a338534bb6f1754b13fabe55d932277bcd8697c8c0e19df5315b5c523b6a7d4342c"}],"deposits":[],"voluntary_exits":[],"sync_aggregate":{"sync_committee_bits":"0xfffffffffffffffffffffffffff7fffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff7ffffffffffffffffffffffffffff","sync_committee_signature":"0x8af1744b4f9de7c56b4fd2149e3de7843d8ca5c52bb79d8fc68aacf951c23ca902dfe74da26bd6f29a5f122507c2277c0c109026bb238888acd20db66a102ff0af2ddda1fcda97f013b66b94d13ca219dbd88da5b26eca5a6e7558bf7af0e6e8"},"execution_payload":{"parent_hash":"0x4eafb14dfc9bb7550ca92513a8a25b1b424c189c662dd490b48b60c4dcd8ae2a","fee_recipient":"0x1f9090aae28b8a3dceadf281b0f12828e676c326","state_root":"0x6e30a2ba68ebf6534d8321eb037f9f3cdfd1494a7751baf419bd6c2b98d1876f","receipts_root":"0x7121144a0c20f843b2d3109b770cfaa039b912c8b1f75db5267dbeb9a4115ac2","logs_bloom":"0x9cb1c292c11972a251045810c3ca9aea4d0020218003726629690445fd3b2470c4453316b20022597238ed6253302106973d02f2dea721019f3227321f6dd0206802528e4125cb2eaa8c5b8f50aa6a3083450efa05443a3a181e59d3cceb9e5a00d1ed04069a9a734400605811db2c4c42540a32ec1f8d8507247db6084a0065cd81b573c90fd920b46ca0b969009ce6430434234da00d1d165530cc24da1d3ccfaa4b470388e1e1204d10ce19d04490354c48bc11e92ace4860463b720d09c95dc0103f448ce65ad3b10c2fcd7674d4e0f10547250810154e54f3e668b363cc14587e629862224284c4976288a2087682e8f45151e88cc309e8b912ab0bce45","prev_randao":"0x9f9197e9f0284db1e29376ea0819091d587743d879d46db910477d3cdce99b87","block_number":"18035707","gas_limit":"30000000","gas_used":"15971176","timestamp":"1693498775","extra_data":"0x7273796e632d6275696c6465722e78797a","base_fee_per_gas":"38847930295","block_hash":"0x43ab8f7f090036723a5a2fe741892e46cef8a2b97acc3bb9997d1a7083cbe4c0","transactions":["0x02f901740181b0851ea8e4ab1e851ea8e4ab1e83022b949451c72848c68a965f66fa7a88855f9f7784502a7f80b90104771d503f000000000000000000000000000000000000000000000000000000000000000000000000000000000000000088e6a0c2ddd26feeb64f039a2c41296fcb3f5640000000000000000000000000000000000000000000000004623f9fc0647cc0000000000000000000000000000000000000000000000000000000001fb4e8af50000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000c00000000000000000000000000000000000000000000000000000000000000014c02aaa39b223fe8d0a0e5c4f27ead9083c756cc2000000000000000000000000c001a0993de2a53fea31bc918b99c5a6b864ebb83f7a26a053a337bb1462359d2ca03ca0658455d62c3b2d924f372a839432f4effb0815decbf186fcfe8d94f8d1c84e69","0x02f870014b8402faf0808509a6219a0783029ecb94b78c467206c6fe9a4bf0a3281ed65d8f1b2c78e380844e71d92dc001a03bcdbe98862c396a3b0ef5bc9733e1b6052c1261c26ec2a94d910e96bd78588da038fa2754ec67f19be9068a1be290dd48266e032750a071eed5c091eec2be8b03","0x02f87101830392308085090b845fb782522994ffee087852cb4898e6c3532e776e68bc68b1143b87901bbce9b23f0880c001a0070a87ae9c98e312d6fc9d51a44e27b02d7034fa9efe9d764b7d08dc1bae36c8a06b7894d340857351e26066d16ca28fbb16612bc8d73b30edf901a422799eda25"],"withdrawals":[{"index":"16012063","validator_index":"508528","address":"0xc436eb8aed128275c8f224de2f1dd202c0ab5830","amount":"15605289"},{"index":"16012064","validator_index":"508529","address":"0xc436eb8aed128275c8f224de2f1dd202c0ab5830","amount":"15593371"}]},"bls_to_execution_changes":[]}},"signature":"0x8842473cb4159b6dc9c3485f364dce92b8f22943105458fe1a03a82d649d2f1f823b38055516697dd41f8b7be23d50a614efcf6cbacf96bd00d3bcbafe4e0847246bc866a1d1e73140e1aea76a376a64364d3573643c7b219b128861d5b00fea"}}"#; - let test_block = - serde_json::from_str::(block_str).unwrap(); - - let result = - client.publish_block(test_block.into(), None, ethereum_consensus::Fork::Capella).await; + let test_block = VersionedSignedProposal::default(); + let result = client + .publish_block( + test_block.into(), + Some(BroadcastValidation::ConsensusAndEquivocation), + ethereum_consensus::Fork::Capella, + ) + .await; assert!(result.is_ok()); let code = result.unwrap(); @@ -315,6 +317,7 @@ mod beacon_client_tests { } #[tokio::test] + #[ignore] async fn test_subscribe_to_head_events_live() { let client = get_test_client(); let (tx, mut rx) = channel::(1); @@ -328,16 +331,19 @@ mod beacon_client_tests { loop { match rx.recv().await { - Some(head_event) => { + Ok(head_event) => { println!("Passed: {:?}", head_event); return; } - None => {} + Err(err) => { + println!("Error: {:?}", err); + } } } } #[tokio::test] + #[ignore] async fn test_subscribe_to_payload_events_live() { let client = get_test_client(); let (tx, mut rx) = channel::(1); @@ -351,16 +357,19 @@ mod beacon_client_tests { loop { match rx.recv().await { - Some(head_event) => { + Ok(head_event) => { println!("Passed: {:?}", head_event); return; } - None => {} + Err(err) => { + println!("Error: {:?}", err); + } } } } #[tokio::test] + #[ignore] async fn test_get_live_response() { let client = get_test_client(); match client.sync_status().await { diff --git a/crates/beacon-client/src/mock_beacon_client.rs b/crates/beacon-client/src/mock_beacon_client.rs index 396ce918..2890797f 100644 --- a/crates/beacon-client/src/mock_beacon_client.rs +++ b/crates/beacon-client/src/mock_beacon_client.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use async_trait::async_trait; use ethereum_consensus::{primitives::Root, ssz::prelude::*}; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; use helix_common::{ProposerDuty, ValidatorSummary}; diff --git a/crates/beacon-client/src/mock_multi_beacon_client.rs b/crates/beacon-client/src/mock_multi_beacon_client.rs index e01c9560..317e637a 100644 --- a/crates/beacon-client/src/mock_multi_beacon_client.rs +++ b/crates/beacon-client/src/mock_multi_beacon_client.rs @@ -9,7 +9,7 @@ use ethereum_consensus::{ primitives::{BlsPublicKey, Root}, }; use helix_common::{bellatrix::SimpleSerialize, ProposerDuty, ValidatorStatus, ValidatorSummary}; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; use crate::{ error::BeaconClientError, @@ -58,16 +58,15 @@ impl MultiBeaconClientTrait for MockMultiBeaconClient { block: "test_block".to_string(), state: "test_state".to_string(), }; - let _ = chan.send(head_event).await; - self.chan_head_events_capacity.store(chan.capacity(), std::sync::atomic::Ordering::Relaxed); + let _ = chan.send(head_event); + self.chan_head_events_capacity.store(chan.len(), std::sync::atomic::Ordering::Relaxed); // start a task that sets the number of events in the channel constantly let chan_head_events_capacity = self.chan_head_events_capacity.clone(); tokio::spawn(async move { loop { tokio::time::sleep(std::time::Duration::from_millis(50)).await; - chan_head_events_capacity - .store(chan.capacity(), std::sync::atomic::Ordering::Relaxed); + chan_head_events_capacity.store(chan.len(), std::sync::atomic::Ordering::Relaxed); } }); } diff --git a/crates/beacon-client/src/multi_beacon_client.rs b/crates/beacon-client/src/multi_beacon_client.rs index ded00a3a..6a685862 100644 --- a/crates/beacon-client/src/multi_beacon_client.rs +++ b/crates/beacon-client/src/multi_beacon_client.rs @@ -10,7 +10,7 @@ use helix_common::{ bellatrix::SimpleSerialize, signed_proposal::VersionedSignedProposal, ProposerDuty, ValidatorSummary, }; -use tokio::{sync::mpsc::Sender, task::JoinError}; +use tokio::{sync::broadcast::Sender, task::JoinError}; use tracing::error; use crate::{ @@ -333,6 +333,6 @@ mod multi_beacon_client_tests { ) .await; - assert!(matches!(result, Err(BeaconClientError::BlockValidationFailed))); + assert!(matches!(result, Err(BeaconClientError::BlockIntegrationFailed))); } } diff --git a/crates/beacon-client/src/traits.rs b/crates/beacon-client/src/traits.rs index a2e8412e..4f79fa75 100644 --- a/crates/beacon-client/src/traits.rs +++ b/crates/beacon-client/src/traits.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use ethereum_consensus::primitives::Root; use serde::{de::DeserializeOwned, Serialize}; -use tokio::sync::mpsc::Sender; +use tokio::sync::broadcast::Sender; use helix_common::{bellatrix::SimpleSerialize, ProposerDuty, ValidatorSummary}; diff --git a/crates/database/src/postgres/postgres_db_service_tests.rs b/crates/database/src/postgres/postgres_db_service_tests.rs index 70f94908..07f958be 100644 --- a/crates/database/src/postgres/postgres_db_service_tests.rs +++ b/crates/database/src/postgres/postgres_db_service_tests.rs @@ -1,4 +1,4 @@ -#[cfg(test)] +#[cfg(postgres_test)] mod tests { use crate::{postgres::postgres_db_service::PostgresDatabaseService, DatabaseService}; use ethereum_consensus::{ diff --git a/crates/database/src/postgres/postgres_db_u256_parsing.rs b/crates/database/src/postgres/postgres_db_u256_parsing.rs index ecc6a226..b84f0b07 100644 --- a/crates/database/src/postgres/postgres_db_u256_parsing.rs +++ b/crates/database/src/postgres/postgres_db_u256_parsing.rs @@ -174,6 +174,7 @@ mod tests { } #[test] + #[ignore] fn test_to_sql_from_sql() { for value in get_values().into_iter() { let mut bytes = bytes::BytesMut::new(); diff --git a/crates/datastore/src/redis/redis_cache.rs b/crates/datastore/src/redis/redis_cache.rs index f5d8c72c..c376767b 100644 --- a/crates/datastore/src/redis/redis_cache.rs +++ b/crates/datastore/src/redis/redis_cache.rs @@ -404,9 +404,24 @@ impl Auctioneer for RedisCache { } } - self.set(LAST_SLOT_DELIVERED_KEY, &slot, None).await?; - self.set(LAST_HASH_DELIVERED_KEY, &format!("{hash:?}"), None).await?; - Ok(()) + let mut conn = self.pool.get().await.map_err(RedisCacheError::from)?; + let mut pipe = redis::pipe(); + + // Add the SET commands to the pipeline + let slot_value = serde_json::to_string(&slot).map_err(RedisCacheError::from)?; + let hash_value = + serde_json::to_string(&format!("{hash:?}")).map_err(RedisCacheError::from)?; + pipe.atomic() + .cmd("SET") + .arg(LAST_SLOT_DELIVERED_KEY) + .arg(slot_value) + .ignore() + .cmd("SET") + .arg(LAST_HASH_DELIVERED_KEY) + .arg(hash_value) + .ignore(); + + Ok(pipe.query_async(&mut conn).await.map_err(RedisCacheError::from)?) } async fn get_best_bid( @@ -901,7 +916,7 @@ fn get_top_bid(bid_values: &HashMap) -> Option<(String, U256)> { bid_values.iter().max_by_key(|&(_, value)| value).map(|(key, value)| (key.clone(), *value)) } -#[cfg(test)] +#[cfg(redis_cache_test)] mod tests { use super::*; diff --git a/crates/housekeeper/src/chain_event_updater.rs b/crates/housekeeper/src/chain_event_updater.rs index ce5b06c9..c36e37f1 100644 --- a/crates/housekeeper/src/chain_event_updater.rs +++ b/crates/housekeeper/src/chain_event_updater.rs @@ -1,13 +1,10 @@ use std::sync::Arc; use ethereum_consensus::{configs::goerli::CAPELLA_FORK_EPOCH, primitives::Bytes32}; -use tokio::sync::mpsc; +use tokio::sync::{broadcast, mpsc}; use tracing::{error, info, warn}; -use helix_beacon_client::{ - types::{HeadEventData, PayloadAttributes, PayloadAttributesEvent}, - MultiBeaconClientTrait, -}; +use helix_beacon_client::types::{HeadEventData, PayloadAttributes, PayloadAttributesEvent}; use helix_common::api::builder_api::BuilderGetValidatorsResponseEntry; use helix_database::DatabaseService; use helix_utils::{calculate_withdrawals_root, has_reached_fork}; @@ -70,28 +67,35 @@ impl ChainEventUpdater { } /// Starts the updater and listens to head events and new subscriptions. - pub async fn start(&mut self, beacon_client: T) { - let mut head_event_rx = self.subscribe_to_head_events(&beacon_client).await; - let mut payload_attributes_rx = self.subscribe_to_payload_attributes(&beacon_client).await; - + pub async fn start( + &mut self, + mut head_event_rx: broadcast::Receiver, + mut payload_attributes_rx: broadcast::Receiver, + ) { loop { tokio::select! { head_event_result = head_event_rx.recv() => { match head_event_result { - Some(head_event) => self.process_head_event(head_event).await, - None => { - // Re-subscribe if the channel was closed - head_event_rx = self.subscribe_to_head_events(&beacon_client).await; - }, + Ok(head_event) => self.process_head_event(head_event).await, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("head events lagged by {n} events"); + } + Err(broadcast::error::RecvError::Closed) => { + error!("head event channel closed"); + break; + } } } payload_attributes_result = payload_attributes_rx.recv() => { match payload_attributes_result { - Some(payload_attributes) => self.process_payload_attributes(payload_attributes).await, - None => { - // Re-subscribe if the channel was closed - payload_attributes_rx = self.subscribe_to_payload_attributes(&beacon_client).await; - }, + Ok(payload_attributes) => self.process_payload_attributes(payload_attributes).await, + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("payload attributes events lagged by {n} events"); + } + Err(broadcast::error::RecvError::Closed) => { + error!("payload attributes event channel closed"); + break; + } } } Some(sender) = self.subscription_channel.recv() => { @@ -199,24 +203,4 @@ impl ChainEventUpdater { self.subscribers.remove(index); } } - - /// Subscribe to head events from the beacon client. - async fn subscribe_to_head_events( - &self, - beacon_client: &T, - ) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(100); - beacon_client.subscribe_to_head_events(tx).await; - rx - } - - /// Subscribe to head events from the beacon client. - async fn subscribe_to_payload_attributes( - &self, - beacon_client: &T, - ) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(100); - beacon_client.subscribe_to_payload_attributes_events(tx).await; - rx - } } diff --git a/crates/housekeeper/src/housekeeper.rs b/crates/housekeeper/src/housekeeper.rs index 6c6ca8ce..ba9c029d 100644 --- a/crates/housekeeper/src/housekeeper.rs +++ b/crates/housekeeper/src/housekeeper.rs @@ -7,7 +7,7 @@ use std::{ use ethereum_consensus::primitives::BlsPublicKey; use reth_primitives::{constants::EPOCH_SLOTS, revm_primitives::HashSet}; use tokio::{ - sync::{mpsc::channel, Mutex}, + sync::{broadcast, Mutex}, time::{sleep, Instant}, }; use tracing::{debug, error, info, warn}; @@ -27,7 +27,6 @@ use helix_datastore::Auctioneer; use crate::error::HousekeeperError; use uuid::Uuid; -pub(crate) const HEAD_EVENT_CHANNEL_SIZE: usize = 100; const PROPOSER_DUTIES_UPDATE_FREQ: u64 = 8; const TRUSTED_PROPOSERS_UPDATE_FREQ: u64 = 5; @@ -103,17 +102,24 @@ impl /// Start the Housekeeper service. pub async fn start( self: &SharedHousekeeper, + head_event_receiver: &mut broadcast::Receiver, ) -> Result<(), BeaconClientError> { let best_sync_status = self.beacon_client.best_sync_status().await?; self.process_new_slot(best_sync_status.head_slot).await; - - // Process all future head events. - let (head_event_sender, mut head_event_receiver) = - channel::(HEAD_EVENT_CHANNEL_SIZE); - self.beacon_client.subscribe_to_head_events(head_event_sender).await; - while let Some(head_event) = head_event_receiver.recv().await { - self.process_new_slot(head_event.slot).await; + loop { + match head_event_receiver.recv().await { + Ok(head_event) => { + self.process_new_slot(head_event.slot).await; + } + Err(broadcast::error::RecvError::Lagged(n)) => { + warn!("head events lagged by {n} events"); + } + Err(broadcast::error::RecvError::Closed) => { + error!("head event channel closed"); + break; + } + } } Ok(()) diff --git a/crates/housekeeper/src/housekeeper_tests.rs b/crates/housekeeper/src/housekeeper_tests.rs index eed3d43d..13c049b5 100644 --- a/crates/housekeeper/src/housekeeper_tests.rs +++ b/crates/housekeeper/src/housekeeper_tests.rs @@ -10,14 +10,16 @@ mod housekeeper_tests { }; // ++++ IMPORTS ++++ - use crate::housekeeper::{ - Housekeeper, HEAD_EVENT_CHANNEL_SIZE, SLEEP_DURATION_BEFORE_REFRESHING_VALIDATORS, + use crate::housekeeper::{Housekeeper, SLEEP_DURATION_BEFORE_REFRESHING_VALIDATORS}; + use helix_beacon_client::{ + mock_multi_beacon_client::MockMultiBeaconClient, MultiBeaconClientTrait, }; - use helix_beacon_client::mock_multi_beacon_client::MockMultiBeaconClient; use helix_common::{api::builder_api::BuilderGetValidatorsResponseEntry, ValidatorSummary}; use helix_database::MockDatabaseService; use helix_datastore::MockAuctioneer; - use tokio::task; + use tokio::{sync::broadcast, task}; + + const HEAD_EVENT_CHANNEL_SIZE: usize = 100; // ++++ HELPERS ++++ fn get_housekeeper() -> HelperVars { @@ -36,7 +38,7 @@ mod housekeeper_tests { proposer_duties_has_been_read.clone(), ); let auctioneer = MockAuctioneer::new(); - let housekeeper = Housekeeper::new(Arc::new(db), beacon_client, auctioneer); + let housekeeper = Housekeeper::new(Arc::new(db), beacon_client.clone(), auctioneer); HelperVars { housekeeper, @@ -46,14 +48,19 @@ mod housekeeper_tests { proposer_duties, state_validators_has_been_read, proposer_duties_has_been_read, + beacon_client, } } - fn start_housekeeper( + async fn start_housekeeper( housekeeper: Arc>, + beacon_client: MockMultiBeaconClient, ) { + let (head_event_sender, mut head_event_receiver) = + broadcast::channel(HEAD_EVENT_CHANNEL_SIZE); + beacon_client.subscribe_to_head_events(head_event_sender).await; task::spawn(async move { - housekeeper.start().await.unwrap(); + housekeeper.start(&mut head_event_receiver).await.unwrap(); }); } @@ -66,46 +73,31 @@ mod housekeeper_tests { pub proposer_duties: Arc>>, pub state_validators_has_been_read: Arc, pub proposer_duties_has_been_read: Arc, + pub beacon_client: MockMultiBeaconClient, } // ++++ TESTS ++++ - #[tokio::test] - async fn test_beacon_client_subscribes_to_head_events() { - let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); - - tokio::time::sleep(Duration::from_millis(100)).await; - - assert!(vars.subscribed_to_head_events.load(std::sync::atomic::Ordering::Relaxed)); - } - #[tokio::test] async fn test_head_event_is_processed_by_housekeeper() { let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); + start_housekeeper(vars.housekeeper.clone(), vars.beacon_client).await; tokio::time::sleep(Duration::from_millis(10)).await; - // assert that the capacity of the channel is correct at HEAD_EVENT_CHANNEL_SIZE - 1 as the + // assert that the len of the channel is correct at 1 as the // beacon client sends a dummy event - assert!( - vars.chan_head_events_capacity.load(std::sync::atomic::Ordering::Relaxed) == - HEAD_EVENT_CHANNEL_SIZE - 1 - ); + assert!(vars.chan_head_events_capacity.load(std::sync::atomic::Ordering::Relaxed) == 1); tokio::time::sleep(Duration::from_millis(100)).await; - // assert that the capacity of the channel is correct at HEAD_EVENT_CHANNEL_SIZE as the + // assert that the len of the channel is correct at 0 as the // housekeeper has processed the dummy event assert!(vars.subscribed_to_head_events.load(std::sync::atomic::Ordering::Relaxed)); - assert!( - vars.chan_head_events_capacity.load(std::sync::atomic::Ordering::Relaxed) == - HEAD_EVENT_CHANNEL_SIZE - ); + assert!(vars.chan_head_events_capacity.load(std::sync::atomic::Ordering::Relaxed) == 0); } #[tokio::test] async fn test_known_validators_are_set() { let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); + start_housekeeper(vars.housekeeper.clone(), vars.beacon_client).await; tokio::time::sleep( SLEEP_DURATION_BEFORE_REFRESHING_VALIDATORS + Duration::from_millis(100), ) @@ -120,7 +112,7 @@ mod housekeeper_tests { #[tokio::test] async fn test_proposer_duties_set() { let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); + start_housekeeper(vars.housekeeper.clone(), vars.beacon_client).await; tokio::time::sleep(Duration::from_millis(100)).await; assert!(vars.proposer_duties.lock().unwrap().len() == 1); @@ -132,7 +124,7 @@ mod housekeeper_tests { #[tokio::test] async fn test_proposer_duties_have_been_read() { let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); + start_housekeeper(vars.housekeeper.clone(), vars.beacon_client).await; tokio::time::sleep(Duration::from_millis(100)).await; assert!(vars.proposer_duties_has_been_read.load(std::sync::atomic::Ordering::Relaxed)); @@ -141,7 +133,7 @@ mod housekeeper_tests { #[tokio::test] async fn test_state_validators_have_been_read() { let vars = get_housekeeper(); - start_housekeeper(vars.housekeeper.clone()); + start_housekeeper(vars.housekeeper.clone(), vars.beacon_client).await; tokio::time::sleep( SLEEP_DURATION_BEFORE_REFRESHING_VALIDATORS + Duration::from_millis(100), )