Skip to content

Commit

Permalink
Update orchestra
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Jun 28, 2024
1 parent 18a6a56 commit 33b293f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 17 deletions.
24 changes: 18 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ num-rational = { version = "0.4.1" }
num-traits = { version = "0.2.17", default-features = false }
num_cpus = { version = "1.13.1" }
once_cell = { version = "1.19.0" }
orchestra = { version = "0.3.5", default-features = false }
orchestra = { version = "0.4.0", default-features = false }
pallet-alliance = { path = "substrate/frame/alliance", default-features = false }
pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false }
pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false }
Expand Down
13 changes: 12 additions & 1 deletion polkadot/node/malus/src/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ where
<OutgoingMessage as TryFrom<overseer::AllMessages>>::Error: std::fmt::Debug,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand All @@ -103,7 +107,14 @@ where
}
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError<OutgoingMessage>> {
self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
}

fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError<OutgoingMessage>> {
let msg = <
<<Fil as MessageInterceptor<Sender>>::Message as overseer::AssociateOutgoing
>::OutgoingMessages as From<OutgoingMessage>>::from(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl TestState {
// Test will fail if this does not happen until timeout.
let mut remaining_stores = self.valid_chunks.len();

let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer;
let TestSubsystemContextHandle { tx, mut rx, .. } = harness.virtual_overseer;

// Spawning necessary as incoming queue can only hold a single item, we don't want to dead
// lock ;-)
Expand Down
11 changes: 11 additions & 0 deletions polkadot/node/network/bridge/src/rx/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,8 @@ async fn assert_sends_validation_event_to_all(
event: NetworkBridgeEvent<net_protocol::VersionedValidationProtocol>,
virtual_overseer: &mut TestSubsystemContextHandle<NetworkBridgeRxMessage>,
) {
let is_peer_view_change = matches!(event, NetworkBridgeEvent::PeerViewChange(..));

// Ordering must be consistent across:
// `fn dispatch_validation_event_to_all_unbounded`
// `dispatch_validation_events_to_all`
Expand Down Expand Up @@ -575,6 +577,15 @@ async fn assert_sends_validation_event_to_all(
GossipSupportMessage::NetworkBridgeUpdate(e)
) if e == event.focus().expect("could not focus message")
);

// Peer view changes sent with high priority.
if is_peer_view_change {
assert_eq!(
virtual_overseer.message_counter.with_high_priority(),
if is_peer_view_change { 4 } else { 0 }
);
virtual_overseer.message_counter.reset();
}
}

async fn assert_sends_collation_event_to_all(
Expand Down
9 changes: 5 additions & 4 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,11 @@ pub use polkadot_node_metrics::{

pub use orchestra as gen;
pub use orchestra::{
contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket,
NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived,
Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance,
SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra,
TrySendError,
};

#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
Expand Down
61 changes: 57 additions & 4 deletions polkadot/node/subsystem-test-helpers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use std::{
convert::Infallible,
future::Future,
pin::Pin,
sync::Arc,
sync::{atomic::AtomicUsize, Arc},
task::{Context, Poll, Waker},
time::Duration,
};
Expand Down Expand Up @@ -146,12 +146,13 @@ pub fn single_item_sink<T>() -> (SingleItemSink<T>, SingleItemStream<T>) {
#[derive(Clone)]
pub struct TestSubsystemSender {
tx: mpsc::UnboundedSender<AllMessages>,
message_counter: MessageCounter,
}

/// Construct a sender/receiver pair.
pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver<AllMessages>) {
let (tx, rx) = mpsc::unbounded();
(TestSubsystemSender { tx }, rx)
(TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx)
}

#[async_trait::async_trait]
Expand All @@ -161,13 +162,26 @@ where
OutgoingMessage: Send + 'static,
{
async fn send_message(&mut self, msg: OutgoingMessage) {
self.send_message_with_priority::<overseer::NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: overseer::Priority>(&mut self, msg: OutgoingMessage) {
self.message_counter.increment(P::priority());
self.tx.send(msg.into()).await.expect("test overseer no longer live");
}

fn try_send_message(
&mut self,
msg: OutgoingMessage,
) -> Result<(), TrySendError<OutgoingMessage>> {
self.try_send_message_with_priority::<overseer::NormalPriority>(msg)
}

fn try_send_message_with_priority<P: overseer::Priority>(
&mut self,
msg: OutgoingMessage,
) -> Result<(), TrySendError<OutgoingMessage>> {
self.message_counter.increment(P::priority());
self.tx.unbounded_send(msg.into()).expect("test overseer no longer live");
Ok(())
}
Expand Down Expand Up @@ -277,6 +291,9 @@ pub struct TestSubsystemContextHandle<M> {

/// Direct access to the receiver.
pub rx: mpsc::UnboundedReceiver<AllMessages>,

/// Message counter over subsystems.
pub message_counter: MessageCounter,
}

impl<M> TestSubsystemContextHandle<M> {
Expand Down Expand Up @@ -322,6 +339,34 @@ pub fn make_subsystem_context<M, S>(
make_buffered_subsystem_context(spawner, 0)
}

/// Message counter over subsystems.
#[derive(Default, Clone)]
pub struct MessageCounter {
total: Arc<AtomicUsize>,
with_high_priority: Arc<AtomicUsize>,
}

impl MessageCounter {
/// Increment the message counter.
pub fn increment(&mut self, priority_level: overseer::PriorityLevel) {
self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if matches!(priority_level, overseer::PriorityLevel::High) {
self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}

/// Reset the message counter.
pub fn reset(&mut self) {
self.total.store(0, std::sync::atomic::Ordering::SeqCst);
self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst);
}

/// Get the messages with high priority count.
pub fn with_high_priority(&self) -> usize {
self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst)
}
}

/// Make a test subsystem context with buffered overseer channel. Some tests (e.g.
/// `dispute-coordinator`) create too many parallel operations and deadlock unless
/// the channel is buffered. Usually `buffer_size=1` is enough.
Expand All @@ -331,15 +376,23 @@ pub fn make_buffered_subsystem_context<M, S>(
) -> (TestSubsystemContext<M, SpawnGlue<S>>, TestSubsystemContextHandle<M>) {
let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size);
let (all_messages_tx, all_messages_rx) = mpsc::unbounded();
let message_counter = MessageCounter::default();

(
TestSubsystemContext {
tx: TestSubsystemSender { tx: all_messages_tx },
tx: TestSubsystemSender {
tx: all_messages_tx,
message_counter: message_counter.clone(),
},
rx: overseer_rx,
spawn: SpawnGlue(spawner),
message_buffer: VecDeque::new(),
},
TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx },
TestSubsystemContextHandle {
tx: overseer_tx,
rx: all_messages_rx,
message_counter: message_counter.clone(),
},
)
}

Expand Down

0 comments on commit 33b293f

Please sign in to comment.