Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel builder polish #282

Merged
merged 3 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use reth_provider::{BlockReader, DatabaseProviderFactory, StateProviderFactory};
use std::{marker::PhantomData, sync::Arc, time::Instant};
use time::OffsetDateTime;
use tokio_util::sync::CancellationToken;
use tracing::{trace, warn};
use tracing::{info_span, trace};

use crate::{
building::{
builders::{
block_building_helper::{BlockBuildingHelper, BlockBuildingHelperFromProvider},
UnfinishedBlockBuildingSink,
handle_building_error, UnfinishedBlockBuildingSink,
},
BlockBuildingContext,
},
Expand Down Expand Up @@ -104,7 +104,9 @@ where
}
if self.best_results.get_number_of_orders() > 0 {
let orders_closed_at = OffsetDateTime::now_utc();
self.try_build_block(orders_closed_at);
if !self.try_build_block(orders_closed_at) {
break;
}
}
}
trace!(
Expand All @@ -113,12 +115,12 @@ where
);
}

/// Attempts to build a new block if not already building.
/// Attempts to build a new block if not already building. Returns if block building should continue.
///
/// # Arguments
///
/// * `orders_closed_at` - The timestamp when orders were closed.
fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) {
fn try_build_block(&mut self, orders_closed_at: OffsetDateTime) -> bool {
let time_start = Instant::now();

let current_best_results = self.best_results.clone();
Expand All @@ -128,7 +130,7 @@ where
// Check if version has incremented
if let Some(last_version) = self.last_version {
if version == last_version {
return;
return true;
}
}
self.last_version = Some(version);
Expand All @@ -140,18 +142,18 @@ where
);

if best_orderings_per_group.is_empty() {
return;
return true;
}

match self.build_new_block(&mut best_orderings_per_group, orders_closed_at) {
Ok(new_block) => {
if let Ok(value) = new_block.true_block_value() {
trace!(
"Parallel builder run id {}: Built new block with results version {:?} and profit: {:?} in {:?} ms",
self.run_id,
version,
format_ether(value),
time_start.elapsed().as_millis()
run_id = self.run_id,
version = version,
time_ms = time_start.elapsed().as_millis(),
profit = format_ether(value),
"Parallel builder built new block",
);

if new_block.built_block_trace().got_no_signer_error {
Expand All @@ -163,11 +165,15 @@ where
}
}
}
Err(e) => {
warn!("Parallel builder run id {}: Failed to build new block with results version {:?}: {:?}", self.run_id, version, e);
Err(err) => {
let _span = info_span!("Parallel builder failed to build new block",run_id = self.run_id,version = version,err=?err).entered();
if !handle_building_error(err) {
return false;
}
}
}
self.run_id += 1;
true
}

/// Builds a new block using the best results from each group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ where
self.thread_pool.spawn(move || {
while !cancellation_token.is_cancelled() {
if let Some(task) = task_queue.pop() {
if cancellation_token.is_cancelled() {
return;
}
let task_start = Instant::now();
if let Ok((task_id, result)) = Self::process_task(
task,
Expand All @@ -92,6 +95,7 @@ where
time_taken_ms = %task_start.elapsed().as_millis(),
"Conflict resolving: failed to send group result"
);
return;
}
}
}
Expand All @@ -100,7 +104,7 @@ where
});
}

pub fn process_task(
fn process_task(
task: ConflictTask,
ctx: &BlockBuildingContext,
provider: &P,
Expand Down Expand Up @@ -131,8 +135,9 @@ where
}
Err(err) => {
warn!(
"Error running conflict task for group_idx {}: {:?}",
task_id, err
group_id = task_id,
err = ?err,
"Error running conflict task for group_idx",
);
Err(err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ConflictTaskGenerator {
.cloned()
.collect();

trace!("Removing subset groups: {:?}", subset_ids);
trace!(groups = ?subset_ids,"Removing subset groups");
for id in subset_ids {
self.existing_groups.remove(&id);
self.cancel_tasks_for_group(id);
Expand Down Expand Up @@ -158,10 +158,12 @@ impl ConflictTaskGenerator {
TaskPriority::High
};
trace!(
"Processing multi order group {group_id} with {} orders, {} profit with priority {:?}",
new_group.orders.len(),
format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())),
priority.display()
group = group_id,
order_count = new_group.orders.len(),
profit =
format_ether(self.sum_top_n_profits(&new_group.orders, new_group.orders.len())),
priority = priority.display(),
"Processing multi order group"
);
if self.existing_groups.contains_key(&group_id) {
self.update_tasks(group_id, new_group, priority);
Expand All @@ -186,8 +188,9 @@ impl ConflictTaskGenerator {
.send((group_id, (sequence_of_orders, group.clone())))
{
warn!(
"Failed to send single order result for group {}: {:?}",
group_id, e
error = ?e,
group_id,
"Failed to send single order result for group",
);
}
}
Expand Down Expand Up @@ -294,9 +297,9 @@ impl ConflictTaskGenerator {
priority: TaskPriority,
) {
trace!(
"Updating tasks for group {} with priority {:?}",
group_id,
priority.display()
group = group_id,
priority = priority.display(),
"Updating tasks",
);
// Cancel existing tasks for this grou
self.cancel_tasks_for_group(group_id);
Expand Down
Loading