Skip to content

Commit

Permalink
v2.0: Scheduler Frequency Fixes (backport of #4545) (#4576)
Browse files Browse the repository at this point in the history
* Change prio_graph_scheduler configurations for 1k maxs, 256 look ahead

* Break loop on scanned transaction count

* make Hold decision behave same as Consume during receive

* receive maximum of 5_000 packets - loose max

* receive_completed before process_transactions

---------

Co-authored-by: Andrew Fitzgerald <[email protected]>
  • Loading branch information
mergify[bot] and apfitzge authored Jan 23, 2025
1 parent 4c817c2 commit 336796a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl PrioGraphScheduler {
account_locks: ThreadAwareAccountLocks::new(num_threads),
consume_work_senders,
finished_consume_work_receiver,
look_ahead_window_size: 2048,
look_ahead_window_size: 256,
}
}

Expand Down Expand Up @@ -154,17 +154,19 @@ impl PrioGraphScheduler {

let mut unblock_this_batch =
Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH);
const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000;
const MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS: usize = 1_000;
let mut num_scanned: usize = 0;
let mut num_scheduled: usize = 0;
let mut num_sent: usize = 0;
let mut num_unschedulable: usize = 0;
while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
while num_scheduled < MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS {
// If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule.
if prio_graph.is_empty() {
break;
}

while let Some(id) = prio_graph.pop() {
num_scanned += 1;
unblock_this_batch.push(id);

// Should always be in the container, during initial testing phase panic.
Expand Down Expand Up @@ -229,12 +231,12 @@ impl PrioGraphScheduler {
break;
}
}

if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS {
break;
}
}
}

if num_scanned >= MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS {
break;
}
}

// Send all non-empty batches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ impl SchedulerController {
self.timing_metrics
.maybe_report_and_reset_slot(new_leader_slot);

self.process_transactions(&decision)?;
self.receive_completed()?;
self.process_transactions(&decision)?;
if !self.receive_and_buffer_packets(&decision) {
break;
}
Expand Down Expand Up @@ -435,11 +435,11 @@ impl SchedulerController {

/// Returns whether the packet receiver is still connected.
fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool {
let remaining_queue_capacity = self.container.remaining_queue_capacity();
const MAX_RECEIVE_PACKETS: usize = 5_000;

const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10);
let (recv_timeout, should_buffer) = match decision {
BufferedPacketsDecision::Consume(_) => (
BufferedPacketsDecision::Consume(_) | BufferedPacketsDecision::Hold => (
if self.container.is_empty() {
MAX_PACKET_RECEIVE_TIME
} else {
Expand All @@ -448,14 +448,12 @@ impl SchedulerController {
true,
),
BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarder.is_some()),
BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => {
(MAX_PACKET_RECEIVE_TIME, true)
}
BufferedPacketsDecision::ForwardAndHold => (MAX_PACKET_RECEIVE_TIME, true),
};

let (received_packet_results, receive_time_us) = measure_us!(self
.packet_receiver
.receive_packets(recv_timeout, remaining_queue_capacity, |packet| {
.receive_packets(recv_timeout, MAX_RECEIVE_PACKETS, |packet| {
packet.check_excessive_precompiles()?;
Ok(packet)
}));
Expand Down

0 comments on commit 336796a

Please sign in to comment.