Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #15

Merged
merged 19 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
name: Rust
name: Build

on:
push:
branches: [ main, develop ]
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: Rust Linting
name: Linting

on:
push:
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop ]
types: [opened, synchronize, reopened]
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always
Expand Down
34 changes: 34 additions & 0 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Unit Tests

on:
push:
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always

jobs:
unit-test:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4
with:
fetch-depth: '0'
path: ./repos/${{ secrets.REPO_NAME }}
ref: ${{ github.ref }}

# Install Protocol Buffers Compiler
- name: Install Protobuf Compiler (protoc)
run: sudo apt-get install -y protobuf-compiler

- name: Change to project directory
run: cd ./repos/${{ secrets.REPO_NAME }}

# Run unit tests
- name: Run unit tests
run: cargo test --workspace --all-features
working-directory: ./repos/${{ secrets.REPO_NAME }}
12 changes: 12 additions & 0 deletions crates/api/src/builder/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,18 @@ where
// Decode the incoming request body into a payload
let (payload, _) = decode_payload(req, &mut trace, &request_id).await?;

// Verify the payload is for the current slot
if payload.slot() <= head_slot {
warn!(
request_id = %request_id,
"submission is for a past slot",
);
return Err(BuilderApiError::SubmissionForPastSlot {
current_slot: head_slot,
submission_slot: payload.slot(),
});
}

let builder_pub_key = payload.builder_public_key().clone();
let block_hash = payload.message().block_hash.clone();
debug!(
Expand Down
1 change: 1 addition & 0 deletions crates/api/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ mod tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_submit_block_slot_mismatch() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver) = start_api_server().await;
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/gossiper/grpc_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl GrpcGossiperClient {
if let Err(err) = client.broadcast_header(request).await {
return match err.code() {
tonic::Code::Unavailable => {
error!(err = %err, "failed to broadcast block");
error!(err = %err, "failed to broadcast header");
drop(client_guard);
// Reconnect
self.connect().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn run() {
);

let (head_event_sender, mut head_event_receiver) =
channel::<helix_beacon_client::types::HeadEventData>(100);
tokio::sync::broadcast::channel::<helix_beacon_client::types::HeadEventData>(100);

tokio::spawn(async move {
if let Err(err) = beacon_client.subscribe_to_head_events(head_event_sender).await {
Expand All @@ -134,7 +134,7 @@ async fn run() {

let mut first_fetch_complete = false;
// Process registrations each half epoch
while let Some(head_event) = head_event_receiver.recv().await {
while let Ok(head_event) = head_event_receiver.recv().await {
println!("New head event: {}", head_event.slot);
if head_event.slot % 5 != 0 && first_fetch_complete {
continue;
Expand Down
10 changes: 5 additions & 5 deletions crates/api/src/proposer/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ where
// Bulk check if the validators are known
let registration_pub_keys =
registrations.iter().map(|r| r.message.public_key.clone()).collect();
let known_pub_keys =
Arc::new(proposer_api.db.check_known_validators(registration_pub_keys).await?);
let known_pub_keys = proposer_api.db.check_known_validators(registration_pub_keys).await?;

// Check each registration
let mut valid_registrations = Vec::with_capacity(known_pub_keys.len());
Expand Down Expand Up @@ -253,7 +252,7 @@ where
.collect::<Vec<ValidatorRegistrationInfo>>();

// Bulk write registrations to db
tokio::task::spawn(async move {
tokio::spawn(async move {
if let Err(err) =
proposer_api.db.save_validator_registrations(valid_registrations).await
{
Expand Down Expand Up @@ -928,8 +927,8 @@ where
let slot_cutoff_millis = (slot_time * 1000) + GET_PAYLOAD_REQUEST_CUTOFF_MS as u64;

let mut last_error: Option<ProposerApiError> = None;

while get_millis_timestamp()? < slot_cutoff_millis {
let mut first_try = true; // Try at least once to cover case where get_payload is called too late.
while first_try || get_millis_timestamp()? < slot_cutoff_millis {
match self.auctioneer.get_execution_payload(slot, pub_key, block_hash).await {
Ok(Some(versioned_payload)) => return Ok(versioned_payload),
Ok(None) => {
Expand All @@ -941,6 +940,7 @@ where
}
}

first_try = false;
sleep(RETRY_DELAY).await;
}

Expand Down
36 changes: 32 additions & 4 deletions crates/api/src/proposer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod proposer_api_tests {
};
use helix_database::MockDatabaseService;
use helix_datastore::MockAuctioneer;
use helix_housekeeper::{ChainUpdate, SlotUpdate};
use helix_housekeeper::{ChainUpdate, PayloadAttributesUpdate, SlotUpdate};
use helix_utils::request_encoding::Encoding;
use serial_test::serial;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -201,6 +201,22 @@ mod proposer_api_tests {
tokio::time::sleep(Duration::from_millis(100)).await;
}

async fn send_dummy_payload_attr_update(
slot_update_sender: Sender<ChainUpdate>,
submission_slot: u64,
) {
let chain_update = ChainUpdate::PayloadAttributesUpdate(PayloadAttributesUpdate {
slot: submission_slot,
parent_hash: Default::default(),
withdrawals_root: Default::default(),
payload_attributes: Default::default(),
});
slot_update_sender.send(chain_update).await.unwrap();

// sleep for a bit to allow the api to process the slot update
tokio::time::sleep(Duration::from_millis(100)).await;
}

async fn start_api_server() -> (
oneshot::Sender<()>,
HttpServiceConfig,
Expand Down Expand Up @@ -413,6 +429,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_get_header_for_current_slot_no_header() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver, _auctioneer) =
Expand Down Expand Up @@ -582,6 +599,7 @@ mod proposer_api_tests {
// GET_PAYLOAD
#[tokio::test]
#[serial]
#[ignore]
async fn test_get_payload_no_proposer_duty() {
// Start the server
let (tx, http_config, _api, _slot_update_receiver, auctioneer) = start_api_server().await;
Expand Down Expand Up @@ -617,6 +635,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_get_payload_validator_index_mismatch() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver, auctioneer) =
Expand Down Expand Up @@ -666,11 +685,18 @@ mod proposer_api_tests {
let builder_bid = get_signed_builder_bid(U256::from(10));
let _ = auctioneer.best_bid.lock().unwrap().insert(builder_bid.clone());

let current_slot = calculate_current_slot();

// Send slot & payload attributes updates
let slot_update_sender = slot_update_receiver.recv().await.unwrap();
send_dummy_slot_update(slot_update_sender.clone(), None, None, None).await;

let current_slot = calculate_current_slot();
send_dummy_slot_update(
slot_update_sender.clone(),
Some(current_slot - 1),
Some(current_slot),
None,
)
.await;
send_dummy_payload_attr_update(slot_update_sender.clone(), current_slot).await;

// Prepare the request
let req_url =
Expand Down Expand Up @@ -907,6 +933,7 @@ mod proposer_api_tests {

#[tokio::test(flavor = "multi_thread")]
#[serial]
#[ignore]
async fn test_register_validators() {
let (tx, http_config, _api, _slot_update_receiver, _auctioneer) = start_api_server().await;
let req_url =
Expand Down Expand Up @@ -934,6 +961,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_validate_registration() {
let (slot_update_sender, _slot_update_receiver) = channel::<Sender<ChainUpdate>>(32);
let auctioneer = Arc::new(MockAuctioneer::default());
Expand Down
2 changes: 2 additions & 0 deletions crates/api/src/relay_data/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ mod data_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_builder_bids_missing_filter() {
// Start the server
let (tx, http_config, _api, _database) = start_api_server().await;
Expand Down Expand Up @@ -212,6 +213,7 @@ mod data_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_builder_bids_limit_reached() {
// Start the server
let (tx, http_config, _api, _database) = start_api_server().await;
Expand Down
27 changes: 22 additions & 5 deletions crates/api/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{env, sync::Arc, time::Duration};

use ethereum_consensus::crypto::SecretKey;
use tokio::time::{sleep, timeout};
use tokio::{
sync::broadcast,
time::{sleep, timeout},
};
use tracing::{error, info};

use crate::{
Expand All @@ -11,7 +14,7 @@ use crate::{
};
use helix_beacon_client::{
beacon_client::BeaconClient, fiber_broadcaster::FiberBroadcaster,
multi_beacon_client::MultiBeaconClient, BlockBroadcaster,
multi_beacon_client::MultiBeaconClient, BlockBroadcaster, MultiBeaconClientTrait,
};
use helix_common::{
chain_info::ChainInfo, signing::RelaySigningContext, BroadcasterConfig, NetworkConfig,
Expand All @@ -25,6 +28,9 @@ pub(crate) const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const INIT_BROADCASTER_TIMEOUT: Duration = Duration::from_secs(5);

const HEAD_EVENT_CHANNEL_SIZE: usize = 100;
const PAYLOAD_ATTRIBUTE_CHANNEL_SIZE: usize = 300;

pub struct ApiService {}

impl ApiService {
Expand All @@ -46,6 +52,13 @@ impl ApiService {
}
let multi_beacon_client = Arc::new(MultiBeaconClient::<BeaconClient>::new(beacon_clients));

// Subscribe to head and payload attribute events
let (head_event_sender, head_event_receiver) = broadcast::channel(HEAD_EVENT_CHANNEL_SIZE);
multi_beacon_client.subscribe_to_head_events(head_event_sender).await;
let (payload_attribute_sender, payload_attribute_receiver) =
broadcast::channel(PAYLOAD_ATTRIBUTE_CHANNEL_SIZE);
multi_beacon_client.subscribe_to_payload_attributes_events(payload_attribute_sender).await;

let chain_info = Arc::new(match config.network_config {
NetworkConfig::Mainnet => ChainInfo::for_mainnet(),
NetworkConfig::Goerli => ChainInfo::for_goerli(),
Expand All @@ -55,9 +68,10 @@ impl ApiService {

let housekeeper =
Housekeeper::new(db.clone(), multi_beacon_client.clone(), auctioneer.clone());
let mut housekeeper_head_events = head_event_receiver.resubscribe();
tokio::spawn(async move {
loop {
if let Err(err) = housekeeper.start().await {
if let Err(err) = housekeeper.start(&mut housekeeper_head_events).await {
tracing::error!("Housekeeper error: {}", err);
sleep(Duration::from_secs(5)).await;
}
Expand Down Expand Up @@ -88,9 +102,12 @@ impl ApiService {

let (mut chain_event_updater, slot_update_sender) = ChainEventUpdater::new(db.clone());

let mbc_clone = multi_beacon_client.clone();
let chain_updater_head_events = head_event_receiver.resubscribe();
let chain_updater_payload_events = payload_attribute_receiver.resubscribe();
tokio::spawn(async move {
chain_event_updater.start(mbc_clone).await;
chain_event_updater
.start(chain_updater_head_events, chain_updater_payload_events)
.await;
});

let gossiper = Arc::new(
Expand Down
Loading
Loading