From 336796a7184c6e8bbfdc3fea87b7a3278c1d5272 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 22 Jan 2025 18:03:24 -0600 Subject: [PATCH] v2.0: Scheduler Frequency Fixes (backport of #4545) (#4576) * Change prio_graph_scheduler configurations for 1k maxs, 256 look ahead * Break loop on scanned transaction count * make Hold decision behave same as Consume during receive * receive maximum of 5_000 packets - loose max * receive_completed before process_transactions --------- Co-authored-by: Andrew Fitzgerald --- .../prio_graph_scheduler.rs | 16 +++++++++------- .../scheduler_controller.rs | 12 +++++------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs index c640e88a178d22..632fdd8ace2fed 100644 --- a/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs +++ b/core/src/banking_stage/transaction_scheduler/prio_graph_scheduler.rs @@ -43,7 +43,7 @@ impl PrioGraphScheduler { account_locks: ThreadAwareAccountLocks::new(num_threads), consume_work_senders, finished_consume_work_receiver, - look_ahead_window_size: 2048, + look_ahead_window_size: 256, } } @@ -154,17 +154,19 @@ impl PrioGraphScheduler { let mut unblock_this_batch = Vec::with_capacity(self.consume_work_senders.len() * TARGET_NUM_TRANSACTIONS_PER_BATCH); - const MAX_TRANSACTIONS_PER_SCHEDULING_PASS: usize = 100_000; + const MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS: usize = 1_000; + let mut num_scanned: usize = 0; let mut num_scheduled: usize = 0; let mut num_sent: usize = 0; let mut num_unschedulable: usize = 0; - while num_scheduled < MAX_TRANSACTIONS_PER_SCHEDULING_PASS { + while num_scheduled < MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS { // If nothing is in the main-queue of the `PrioGraph` then there's nothing left to schedule. if prio_graph.is_empty() { break; } while let Some(id) = prio_graph.pop() { + num_scanned += 1; unblock_this_batch.push(id); // Should always be in the container, during initial testing phase panic. @@ -229,12 +231,12 @@ impl PrioGraphScheduler { break; } } - - if num_scheduled >= MAX_TRANSACTIONS_PER_SCHEDULING_PASS { - break; - } } } + + if num_scanned >= MAX_TRANSACTIONS_SCANNED_PER_SCHEDULING_PASS { + break; + } } // Send all non-empty batches diff --git a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs index e08c8a6e008fce..b69a8c635844e4 100644 --- a/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs +++ b/core/src/banking_stage/transaction_scheduler/scheduler_controller.rs @@ -119,8 +119,8 @@ impl SchedulerController { self.timing_metrics .maybe_report_and_reset_slot(new_leader_slot); - self.process_transactions(&decision)?; self.receive_completed()?; + self.process_transactions(&decision)?; if !self.receive_and_buffer_packets(&decision) { break; } @@ -435,11 +435,11 @@ impl SchedulerController { /// Returns whether the packet receiver is still connected. fn receive_and_buffer_packets(&mut self, decision: &BufferedPacketsDecision) -> bool { - let remaining_queue_capacity = self.container.remaining_queue_capacity(); + const MAX_RECEIVE_PACKETS: usize = 5_000; const MAX_PACKET_RECEIVE_TIME: Duration = Duration::from_millis(10); let (recv_timeout, should_buffer) = match decision { - BufferedPacketsDecision::Consume(_) => ( + BufferedPacketsDecision::Consume(_) | BufferedPacketsDecision::Hold => ( if self.container.is_empty() { MAX_PACKET_RECEIVE_TIME } else { @@ -448,14 +448,12 @@ impl SchedulerController { true, ), BufferedPacketsDecision::Forward => (MAX_PACKET_RECEIVE_TIME, self.forwarder.is_some()), - BufferedPacketsDecision::ForwardAndHold | BufferedPacketsDecision::Hold => { - (MAX_PACKET_RECEIVE_TIME, true) - } + BufferedPacketsDecision::ForwardAndHold => (MAX_PACKET_RECEIVE_TIME, true), }; let (received_packet_results, receive_time_us) = measure_us!(self .packet_receiver - .receive_packets(recv_timeout, remaining_queue_capacity, |packet| { + .receive_packets(recv_timeout, MAX_RECEIVE_PACKETS, |packet| { packet.check_excessive_precompiles()?; Ok(packet) }));