Skip to content

Commit

Permalink
Merge pull request #15 from gattaca-com/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
gd-0 authored Feb 2, 2024
2 parents 6a945fa + 40a4212 commit 84d31a6
Show file tree
Hide file tree
Showing 22 changed files with 248 additions and 137 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
name: Rust
name: Build

on:
push:
branches: [ main, develop ]
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always
Expand Down
7 changes: 4 additions & 3 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
name: Rust Linting
name: Linting

on:
push:
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop ]
types: [opened, synchronize, reopened]
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always
Expand Down
34 changes: 34 additions & 0 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Unit Tests

on:
push:
branches: [ main, develop, "release/**" ]
pull_request:
branches: [ main, develop, "release/**" ]

env:
CARGO_TERM_COLOR: always

jobs:
unit-test:
runs-on: ubuntu-latest
timeout-minutes: 30

steps:
- uses: actions/checkout@v4
with:
fetch-depth: '0'
path: ./repos/${{ secrets.REPO_NAME }}
ref: ${{ github.ref }}

# Install Protocol Buffers Compiler
- name: Install Protobuf Compiler (protoc)
run: sudo apt-get install -y protobuf-compiler

- name: Change to project directory
run: cd ./repos/${{ secrets.REPO_NAME }}

# Run unit tests
- name: Run unit tests
run: cargo test --workspace --all-features
working-directory: ./repos/${{ secrets.REPO_NAME }}
20 changes: 20 additions & 0 deletions crates/api/src/builder/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,18 @@ where
.await
.map_err(|_| BuilderApiError::InternalError)?;

// Verify the payload is for the current slot
if payload.slot() <= head_slot {
warn!(
request_id = %request_id,
"submission is for a past slot",
);
return Err(BuilderApiError::SubmissionForPastSlot {
current_slot: head_slot,
submission_slot: payload.slot(),
});
}

// Fetch builder info
let builder_info = api.fetch_builder_info(payload.builder_public_key()).await;

Expand Down Expand Up @@ -1283,6 +1295,14 @@ where
BuilderApiError::ProposerDutyNotFound
})?;

if next_proposer_duty.slot != slot {
warn!(request_id = %request_id, "request for past slot");
return Err(BuilderApiError::SubmissionForPastSlot {
current_slot: next_proposer_duty.slot,
submission_slot: slot,
})
}

let payload_attributes =
self.payload_attributes.read().await.get(parent_hash).cloned().ok_or_else(|| {
warn!(request_id = %request_id, "payload attributes not yet known");
Expand Down
1 change: 1 addition & 0 deletions crates/api/src/builder/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ mod tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_submit_block_slot_mismatch() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver) = start_api_server().await;
Expand Down
2 changes: 1 addition & 1 deletion crates/api/src/gossiper/grpc_gossiper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl GrpcGossiperClient {
if let Err(err) = client.broadcast_header(request).await {
return match err.code() {
tonic::Code::Unavailable => {
error!(err = %err, "failed to broadcast block");
error!(err = %err, "failed to broadcast header");
drop(client_guard);
// Reconnect
self.connect().await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn run() {
);

let (head_event_sender, mut head_event_receiver) =
channel::<helix_beacon_client::types::HeadEventData>(100);
tokio::sync::broadcast::channel::<helix_beacon_client::types::HeadEventData>(100);

tokio::spawn(async move {
if let Err(err) = beacon_client.subscribe_to_head_events(head_event_sender).await {
Expand All @@ -134,7 +134,7 @@ async fn run() {

let mut first_fetch_complete = false;
// Process registrations each half epoch
while let Some(head_event) = head_event_receiver.recv().await {
while let Ok(head_event) = head_event_receiver.recv().await {
println!("New head event: {}", head_event.slot);
if head_event.slot % 5 != 0 && first_fetch_complete {
continue;
Expand Down
10 changes: 5 additions & 5 deletions crates/api/src/proposer/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ where
// Bulk check if the validators are known
let registration_pub_keys =
registrations.iter().map(|r| r.message.public_key.clone()).collect();
let known_pub_keys =
Arc::new(proposer_api.db.check_known_validators(registration_pub_keys).await?);
let known_pub_keys = proposer_api.db.check_known_validators(registration_pub_keys).await?;

// Check each registration
let mut valid_registrations = Vec::with_capacity(known_pub_keys.len());
Expand Down Expand Up @@ -253,7 +252,7 @@ where
.collect::<Vec<ValidatorRegistrationInfo>>();

// Bulk write registrations to db
tokio::task::spawn(async move {
tokio::spawn(async move {
if let Err(err) =
proposer_api.db.save_validator_registrations(valid_registrations).await
{
Expand Down Expand Up @@ -928,8 +927,8 @@ where
let slot_cutoff_millis = (slot_time * 1000) + GET_PAYLOAD_REQUEST_CUTOFF_MS as u64;

let mut last_error: Option<ProposerApiError> = None;

while get_millis_timestamp()? < slot_cutoff_millis {
let mut first_try = true; // Try at least once to cover case where get_payload is called too late.
while first_try || get_millis_timestamp()? < slot_cutoff_millis {
match self.auctioneer.get_execution_payload(slot, pub_key, block_hash).await {
Ok(Some(versioned_payload)) => return Ok(versioned_payload),
Ok(None) => {
Expand All @@ -941,6 +940,7 @@ where
}
}

first_try = false;
sleep(RETRY_DELAY).await;
}

Expand Down
36 changes: 32 additions & 4 deletions crates/api/src/proposer/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ mod proposer_api_tests {
};
use helix_database::MockDatabaseService;
use helix_datastore::MockAuctioneer;
use helix_housekeeper::{ChainUpdate, SlotUpdate};
use helix_housekeeper::{ChainUpdate, PayloadAttributesUpdate, SlotUpdate};
use helix_utils::request_encoding::Encoding;
use serial_test::serial;
use std::{sync::Arc, time::Duration};
Expand Down Expand Up @@ -201,6 +201,22 @@ mod proposer_api_tests {
tokio::time::sleep(Duration::from_millis(100)).await;
}

async fn send_dummy_payload_attr_update(
slot_update_sender: Sender<ChainUpdate>,
submission_slot: u64,
) {
let chain_update = ChainUpdate::PayloadAttributesUpdate(PayloadAttributesUpdate {
slot: submission_slot,
parent_hash: Default::default(),
withdrawals_root: Default::default(),
payload_attributes: Default::default(),
});
slot_update_sender.send(chain_update).await.unwrap();

// sleep for a bit to allow the api to process the slot update
tokio::time::sleep(Duration::from_millis(100)).await;
}

async fn start_api_server() -> (
oneshot::Sender<()>,
HttpServiceConfig,
Expand Down Expand Up @@ -413,6 +429,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_get_header_for_current_slot_no_header() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver, _auctioneer) =
Expand Down Expand Up @@ -582,6 +599,7 @@ mod proposer_api_tests {
// GET_PAYLOAD
#[tokio::test]
#[serial]
#[ignore]
async fn test_get_payload_no_proposer_duty() {
// Start the server
let (tx, http_config, _api, _slot_update_receiver, auctioneer) = start_api_server().await;
Expand Down Expand Up @@ -617,6 +635,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_get_payload_validator_index_mismatch() {
// Start the server
let (tx, http_config, _api, mut slot_update_receiver, auctioneer) =
Expand Down Expand Up @@ -666,11 +685,18 @@ mod proposer_api_tests {
let builder_bid = get_signed_builder_bid(U256::from(10));
let _ = auctioneer.best_bid.lock().unwrap().insert(builder_bid.clone());

let current_slot = calculate_current_slot();

// Send slot & payload attributes updates
let slot_update_sender = slot_update_receiver.recv().await.unwrap();
send_dummy_slot_update(slot_update_sender.clone(), None, None, None).await;

let current_slot = calculate_current_slot();
send_dummy_slot_update(
slot_update_sender.clone(),
Some(current_slot - 1),
Some(current_slot),
None,
)
.await;
send_dummy_payload_attr_update(slot_update_sender.clone(), current_slot).await;

// Prepare the request
let req_url =
Expand Down Expand Up @@ -907,6 +933,7 @@ mod proposer_api_tests {

#[tokio::test(flavor = "multi_thread")]
#[serial]
#[ignore]
async fn test_register_validators() {
let (tx, http_config, _api, _slot_update_receiver, _auctioneer) = start_api_server().await;
let req_url =
Expand Down Expand Up @@ -934,6 +961,7 @@ mod proposer_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_validate_registration() {
let (slot_update_sender, _slot_update_receiver) = channel::<Sender<ChainUpdate>>(32);
let auctioneer = Arc::new(MockAuctioneer::default());
Expand Down
2 changes: 2 additions & 0 deletions crates/api/src/relay_data/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ mod data_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_builder_bids_missing_filter() {
// Start the server
let (tx, http_config, _api, _database) = start_api_server().await;
Expand Down Expand Up @@ -212,6 +213,7 @@ mod data_api_tests {

#[tokio::test]
#[serial]
#[ignore]
async fn test_builder_bids_limit_reached() {
// Start the server
let (tx, http_config, _api, _database) = start_api_server().await;
Expand Down
27 changes: 22 additions & 5 deletions crates/api/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::{env, sync::Arc, time::Duration};

use ethereum_consensus::crypto::SecretKey;
use tokio::time::{sleep, timeout};
use tokio::{
sync::broadcast,
time::{sleep, timeout},
};
use tracing::{error, info};

use crate::{
Expand All @@ -11,7 +14,7 @@ use crate::{
};
use helix_beacon_client::{
beacon_client::BeaconClient, fiber_broadcaster::FiberBroadcaster,
multi_beacon_client::MultiBeaconClient, BlockBroadcaster,
multi_beacon_client::MultiBeaconClient, BlockBroadcaster, MultiBeaconClientTrait,
};
use helix_common::{
chain_info::ChainInfo, signing::RelaySigningContext, BroadcasterConfig, NetworkConfig,
Expand All @@ -25,6 +28,9 @@ pub(crate) const API_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) const SIMULATOR_REQUEST_TIMEOUT: Duration = Duration::from_secs(5);
const INIT_BROADCASTER_TIMEOUT: Duration = Duration::from_secs(30);

const HEAD_EVENT_CHANNEL_SIZE: usize = 100;
const PAYLOAD_ATTRIBUTE_CHANNEL_SIZE: usize = 300;

pub struct ApiService {}

impl ApiService {
Expand All @@ -46,6 +52,13 @@ impl ApiService {
}
let multi_beacon_client = Arc::new(MultiBeaconClient::<BeaconClient>::new(beacon_clients));

// Subscribe to head and payload attribute events
let (head_event_sender, head_event_receiver) = broadcast::channel(HEAD_EVENT_CHANNEL_SIZE);
multi_beacon_client.subscribe_to_head_events(head_event_sender).await;
let (payload_attribute_sender, payload_attribute_receiver) =
broadcast::channel(PAYLOAD_ATTRIBUTE_CHANNEL_SIZE);
multi_beacon_client.subscribe_to_payload_attributes_events(payload_attribute_sender).await;

let chain_info = Arc::new(match config.network_config {
NetworkConfig::Mainnet => ChainInfo::for_mainnet(),
NetworkConfig::Goerli => ChainInfo::for_goerli(),
Expand All @@ -55,9 +68,10 @@ impl ApiService {

let housekeeper =
Housekeeper::new(db.clone(), multi_beacon_client.clone(), auctioneer.clone());
let mut housekeeper_head_events = head_event_receiver.resubscribe();
tokio::spawn(async move {
loop {
if let Err(err) = housekeeper.start().await {
if let Err(err) = housekeeper.start(&mut housekeeper_head_events).await {
tracing::error!("Housekeeper error: {}", err);
sleep(Duration::from_secs(5)).await;
}
Expand Down Expand Up @@ -88,9 +102,12 @@ impl ApiService {

let (mut chain_event_updater, slot_update_sender) = ChainEventUpdater::new(db.clone());

let mbc_clone = multi_beacon_client.clone();
let chain_updater_head_events = head_event_receiver.resubscribe();
let chain_updater_payload_events = payload_attribute_receiver.resubscribe();
tokio::spawn(async move {
chain_event_updater.start(mbc_clone).await;
chain_event_updater
.start(chain_updater_head_events, chain_updater_payload_events)
.await;
});

let gossiper = Arc::new(
Expand Down
Loading

0 comments on commit 84d31a6

Please sign in to comment.