diff --git a/libs/traits/src/liquidity_pools.rs b/libs/traits/src/liquidity_pools.rs index 94f90c55a4..e3b8cafe69 100644 --- a/libs/traits/src/liquidity_pools.rs +++ b/libs/traits/src/liquidity_pools.rs @@ -23,30 +23,17 @@ use sp_std::vec::Vec; pub trait LPEncoding: Sized { fn serialize(&self) -> Vec; fn deserialize(input: &[u8]) -> Result; -} -#[cfg(any(test, feature = "std"))] -pub mod test_util { - use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; - use scale_info::TypeInfo; - - use super::*; - - #[derive(Default, Debug, Eq, PartialEq, Clone, Encode, Decode, TypeInfo, MaxEncodedLen)] - pub struct Message; - impl LPEncoding for Message { - fn serialize(&self) -> Vec { - vec![0x42] - } - - fn deserialize(input: &[u8]) -> Result { - match input.first() { - Some(0x42) => Ok(Self), - Some(_) => Err("unsupported message".into()), - None => Err("empty message".into()), - } - } - } + /// Extend this message with a new one + fn pack_with(&mut self, other: Self) -> DispatchResult; + + /// Decompose the message into a list of messages + /// If the message is not decomposable, it returns the own message. + fn submessages(&self) -> Vec; + + /// Creates an empty message. + /// It's the identity message for composing messages with pack_with + fn empty() -> Self; } /// The trait required for sending outbound messages. diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index a1f8bc7412..189cfe8b7e 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -16,6 +16,26 @@ use parity_scale_codec::Encode; use sp_std::cmp::min; +pub struct BufferReader<'a>(pub &'a [u8]); + +impl<'a> BufferReader<'a> { + pub fn read_bytes(&mut self, bytes: usize) -> Option<&[u8]> { + if self.0.len() < bytes { + return None; + } + + let (consumed, remaining) = self.0.split_at(bytes); + self.0 = remaining; + Some(consumed) + } + + pub fn read_array(&mut self) -> Option<&[u8; N]> { + let (consumed, remaining) = self.0.split_first_chunk::()?; + self.0 = remaining; + Some(consumed) + } +} + /// Build a fixed-size array using as many elements from `src` as possible /// without overflowing and ensuring that the array is 0 padded in the case /// where `src.len()` is smaller than S. diff --git a/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs b/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs index dcd1d3b134..fddc1fec92 100644 --- a/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs +++ b/pallets/liquidity-pools-gateway/axelar-gateway-precompile/src/lib.rs @@ -287,7 +287,7 @@ where exit_status: ExitError::Other("account bytes mismatch for domain".into()), })?; - match pallet_liquidity_pools_gateway::Pallet::::process_msg( + match pallet_liquidity_pools_gateway::Pallet::::receive_message( pallet_liquidity_pools_gateway::GatewayOrigin::Domain(domain_address).into(), msg, ) diff --git a/pallets/liquidity-pools-gateway/queue/src/mock.rs b/pallets/liquidity-pools-gateway/queue/src/mock.rs index 23c8d6ba12..3785d5d35e 100644 --- a/pallets/liquidity-pools-gateway/queue/src/mock.rs +++ b/pallets/liquidity-pools-gateway/queue/src/mock.rs @@ -13,7 +13,6 @@ use cfg_mocks::pallet_mock_liquidity_pools_gateway; use cfg_primitives::LPGatewayQueueMessageNonce; -use cfg_traits::liquidity_pools::test_util::Message as LPTestMessage; use cfg_types::domain_address::Domain; use frame_support::derive_impl; use sp_runtime::traits::ConstU128; @@ -46,11 +45,11 @@ impl pallet_balances::Config for Runtime { impl pallet_mock_liquidity_pools_gateway::Config for Runtime { type Destination = Domain; - type Message = LPTestMessage; + type Message = (); } impl Config for Runtime { - type Message = LPTestMessage; + type Message = (); type MessageNonce = LPGatewayQueueMessageNonce; type MessageProcessor = LPGatewayMock; type RuntimeEvent = RuntimeEvent; diff --git a/pallets/liquidity-pools-gateway/queue/src/tests.rs b/pallets/liquidity-pools-gateway/queue/src/tests.rs index 237b076397..e48bcd4e8d 100644 --- a/pallets/liquidity-pools-gateway/queue/src/tests.rs +++ b/pallets/liquidity-pools-gateway/queue/src/tests.rs @@ -1,7 +1,5 @@ use cfg_primitives::LPGatewayQueueMessageNonce; -use cfg_traits::liquidity_pools::{ - test_util::Message as LPTestMessage, MessageQueue as MessageQueueT, -}; +use cfg_traits::liquidity_pools::MessageQueue as MessageQueueT; use frame_support::{assert_noop, assert_ok, dispatch::RawOrigin}; use sp_runtime::{ traits::{BadOrigin, One, Zero}, @@ -35,7 +33,7 @@ mod process_message { #[test] fn success() { new_test_ext().execute_with(|| { - let message = LPTestMessage {}; + let message = (); let nonce = LPGatewayQueueMessageNonce::one(); MessageQueue::::insert(nonce, message.clone()); @@ -87,7 +85,7 @@ mod process_message { #[test] fn failure_message_processor() { new_test_ext().execute_with(|| { - let message = LPTestMessage {}; + let message = (); let nonce = LPGatewayQueueMessageNonce::one(); MessageQueue::::insert(nonce, message.clone()); @@ -126,7 +124,7 @@ mod process_failed_message { #[test] fn success() { new_test_ext().execute_with(|| { - let message = LPTestMessage {}; + let message = (); let nonce = LPGatewayQueueMessageNonce::one(); let error = DispatchError::Unavailable; @@ -179,7 +177,7 @@ mod process_failed_message { #[test] fn failure_message_processor() { new_test_ext().execute_with(|| { - let message = LPTestMessage {}; + let message = (); let nonce = LPGatewayQueueMessageNonce::one(); let error = DispatchError::Unavailable; @@ -218,7 +216,7 @@ mod message_queue_impl { #[test] fn success() { new_test_ext().execute_with(|| { - let message = LPTestMessage {}; + let message = (); assert_ok!(LPGatewayQueue::submit(message.clone())); diff --git a/pallets/liquidity-pools-gateway/src/lib.rs b/pallets/liquidity-pools-gateway/src/lib.rs index d38415409b..aeb41d84e1 100644 --- a/pallets/liquidity-pools-gateway/src/lib.rs +++ b/pallets/liquidity-pools-gateway/src/lib.rs @@ -38,7 +38,7 @@ use cfg_traits::{ }; use cfg_types::domain_address::{Domain, DomainAddress}; use frame_support::{dispatch::DispatchResult, pallet_prelude::*, PalletError}; -use frame_system::pallet_prelude::OriginFor; +use frame_system::pallet_prelude::{ensure_signed, OriginFor}; use message::GatewayMessage; use orml_traits::GetByKey; pub use pallet::*; @@ -208,6 +208,13 @@ pub mod pallet { pub type DomainHookAddress = StorageMap<_, Blake2_128Concat, Domain, [u8; 20], OptionQuery>; + /// Stores a batch message, not ready yet to be enqueue. + /// Lifetime handled by `start_pack_messages()` and `end_pack_messages()` + /// extrinsics. + #[pallet::storage] + pub(crate) type PackedMessage = + StorageMap<_, Blake2_128Concat, (T::AccountId, Domain), T::Message>; + #[pallet::error] pub enum Error { /// Router initialization failed. @@ -219,9 +226,6 @@ pub mod pallet { /// The domain is not supported. DomainNotSupported, - /// Message decoding error. - MessageDecodingFailed, - /// Instance was already added to the domain. InstanceAlreadyAdded, @@ -246,6 +250,14 @@ pub mod pallet { /// Decoding that is essential and this error /// signals malforming of the wrapping information. RelayerMessageDecodingFailed { reason: RelayerMessageDecodingError }, + + /// Emitted when you call `start_pack_messages()` but that was already + /// called. You should finalize the message with `end_pack_messages()` + MessagePackingAlreadyStarted, + + /// Emitted when you can `end_pack_messages()` but the packing process + /// was not started by `start_pack_messages()`. + MessagePackingNotStarted, } #[pallet::call] @@ -356,16 +368,14 @@ pub mod pallet { } /// Process an inbound message. - #[pallet::weight(T::WeightInfo::process_msg())] + #[pallet::weight(T::WeightInfo::receive_message())] #[pallet::call_index(5)] - pub fn process_msg( + pub fn receive_message( origin: OriginFor, msg: BoundedVec, ) -> DispatchResult { - let (domain_address, incoming_msg) = match T::LocalEVMOrigin::ensure_origin(origin)? { - GatewayOrigin::Domain(domain_address) => { - Pallet::::validate(domain_address, msg)? - } + let (origin_address, incoming_msg) = match T::LocalEVMOrigin::ensure_origin(origin)? { + GatewayOrigin::Domain(domain_address) => (domain_address, msg), GatewayOrigin::AxelarRelay(domain_address) => { // Every axelar relay address has a separate storage ensure!( @@ -376,88 +386,58 @@ pub mod pallet { // Every axelar relay will prepend the (sourceChain, // sourceAddress) from actual origination chain to the // message bytes, with a length identifier - let slice_ref = &mut msg.as_slice(); - let length_source_chain: usize = Pallet::::try_range( - slice_ref, - BYTES_U32, - Error::::from(MalformedSourceChainLength), - |be_bytes_u32| { - let mut bytes = [0u8; BYTES_U32]; - // NOTE: This can NEVER panic as the `try_range` logic ensures the given - // bytes have the right length. I.e. 4 in this case - bytes.copy_from_slice(be_bytes_u32); - - u32::from_be_bytes(bytes).try_into().map_err(|_| { - DispatchError::Other("Expect: usize in wasm is always ge u32") - }) - }, - )?; - - let source_chain = Pallet::::try_range( - slice_ref, - length_source_chain, - Error::::from(MalformedSourceChain), - |source_chain| Ok(source_chain.to_vec()), - )?; - - let length_source_address: usize = Pallet::::try_range( - slice_ref, - BYTES_U32, - Error::::from(MalformedSourceAddressLength), - |be_bytes_u32| { - let mut bytes = [0u8; BYTES_U32]; - // NOTE: This can NEVER panic as the `try_range` logic ensures the given - // bytes have the right length. I.e. 4 in this case - bytes.copy_from_slice(be_bytes_u32); - - u32::from_be_bytes(bytes).try_into().map_err(|_| { - DispatchError::Other("Expect: usize in wasm is always ge u32") - }) - }, - )?; - - let source_address = Pallet::::try_range( - slice_ref, - length_source_address, - Error::::from(MalformedSourceAddress), - |source_address| { + let mut input = cfg_utils::BufferReader(msg.as_slice()); + + let length_source_chain = match input.read_array::() { + Some(bytes) => u32::from_be_bytes(*bytes), + None => Err(Error::::from(MalformedSourceChainLength))?, + }; + + let source_chain = match input.read_bytes(length_source_chain as usize) { + Some(bytes) => bytes.to_vec(), + None => Err(Error::::from(MalformedSourceChain))?, + }; + + let length_source_address = match input.read_array::() { + Some(bytes) => u32::from_be_bytes(*bytes), + None => Err(Error::::from(MalformedSourceAddressLength))?, + }; + + let source_address = match input.read_bytes(length_source_address as usize) { + Some(bytes) => { // NOTE: Axelar simply provides the hexadecimal string of an EVM - // address as the `sourceAddress` argument. Solidity does on the - // other side recognize the hex-encoding and encode the hex bytes - // to utf-8 bytes. + // address as the `sourceAddress` argument. Solidity does on + // the other side recognize the hex-encoding and + // encode the hex bytes to utf-8 bytes. // // Hence, we are reverting this process here. - let source_address = - cfg_utils::decode_var_source::(source_address) - .ok_or(Error::::from(MalformedSourceAddress))?; - - Ok(source_address.to_vec()) - }, - )?; - - let origin_msg = Pallet::::try_range( - slice_ref, - slice_ref.len(), - Error::::from(MalformedMessage), - |msg| { - BoundedVec::try_from(msg.to_vec()).map_err(|_| { - DispatchError::Other( - "Remaining bytes smaller vector in the first place. qed.", - ) - }) - }, - )?; - - let origin_domain = - T::OriginRecovery::try_convert((source_chain, source_address))?; - - Pallet::::validate(origin_domain, origin_msg)? + cfg_utils::decode_var_source::(bytes) + .ok_or(Error::::from(MalformedSourceAddress))? + .to_vec() + } + None => Err(Error::::from(MalformedSourceAddress))?, + }; + + ( + T::OriginRecovery::try_convert((source_chain, source_address))?, + BoundedVec::try_from(input.0.to_vec()) + .map_err(|_| Error::::from(MalformedMessage))?, + ) } }; + if let DomainAddress::Centrifuge(_) = origin_address { + return Err(Error::::InvalidMessageOrigin.into()); + } + + ensure!( + Allowlist::::contains_key(origin_address.domain(), origin_address.clone()), + Error::::UnknownInstance, + ); + let gateway_message = GatewayMessage::::Inbound { - domain_address, - message: incoming_msg, + domain_address: origin_address, + message: T::Message::deserialize(&incoming_msg)?, }; T::MessageQueue::submit(gateway_message) @@ -485,47 +465,42 @@ pub mod pallet { Ok(()) } - } - impl Pallet { - pub(crate) fn try_range<'a, D, F>( - slice: &mut &'a [u8], - next_steps: usize, - error: Error, - transformer: F, - ) -> Result - where - F: Fn(&'a [u8]) -> Result, - { - ensure!(slice.len() >= next_steps, error); - - let (input, new_slice) = slice.split_at(next_steps); - let res = transformer(input)?; - *slice = new_slice; - - Ok(res) + /// Start packing messages in a single message instead of enqueue + /// messages. + /// The message will be enqueued once `end_pack_messages()` is called. + #[pallet::weight(T::WeightInfo::start_pack_messages())] + #[pallet::call_index(9)] + pub fn start_batch_message(origin: OriginFor, destination: Domain) -> DispatchResult { + let sender = ensure_signed(origin)?; + + PackedMessage::::mutate((&sender, &destination), |msg| match msg { + Some(_) => Err(Error::::MessagePackingAlreadyStarted.into()), + None => { + *msg = Some(T::Message::empty()); + Ok(()) + } + }) } - fn validate( - address: DomainAddress, - msg: BoundedVec, - ) -> Result<(DomainAddress, T::Message), DispatchError> { - if let DomainAddress::Centrifuge(_) = address { - return Err(Error::::InvalidMessageOrigin.into()); + /// End packing messages. + /// If exists any batch message it will be enqueued. + /// Empty batches are no-op + #[pallet::weight(T::WeightInfo::end_pack_messages())] + #[pallet::call_index(10)] + pub fn end_batch_message(origin: OriginFor, destination: Domain) -> DispatchResult { + let sender = ensure_signed(origin)?; + + match PackedMessage::::take((&sender, &destination)) { + Some(msg) if msg.submessages().is_empty() => Ok(()), //No-op + Some(message) => Self::queue_message(destination, message), + None => Err(Error::::MessagePackingNotStarted.into()), } - - ensure!( - Allowlist::::contains_key(address.domain(), address.clone()), - Error::::UnknownInstance, - ); - - let incoming_msg = T::Message::deserialize(msg.as_slice()) - .map_err(|_| Error::::MessageDecodingFailed)?; - - Ok((address, incoming_msg)) } + } - /// Sends the message to the `InboundMessageHandler`. + impl Pallet { + /// Give the message to the `InboundMessageHandler` to be processed. fn process_inbound_message( domain_address: DomainAddress, message: T::Message, @@ -533,10 +508,18 @@ pub mod pallet { let weight = Weight::from_parts(0, T::Message::max_encoded_len() as u64) .saturating_add(LP_DEFENSIVE_WEIGHT); - match T::InboundMessageHandler::handle(domain_address, message) { - Ok(_) => (Ok(()), weight), - Err(e) => (Err(e), weight), + let mut count = 0; + for submessage in message.submessages() { + count += 1; + + if let Err(e) = T::InboundMessageHandler::handle(domain_address.clone(), submessage) + { + // We only consume the processed weight if error during the batch + return (Err(e), weight.saturating_mul(count)); + } } + + (Ok(()), weight.saturating_mul(count)) } /// Retrieves the router stored for the provided domain, sends the @@ -547,71 +530,51 @@ pub mod pallet { domain: Domain, message: T::Message, ) -> (DispatchResult, Weight) { - let mut weight = T::DbWeight::get().reads(1); + let read_weight = T::DbWeight::get().reads(1); - let router = match DomainRouters::::get(domain) { - Some(r) => r, - None => return (Err(Error::::RouterNotFound.into()), weight), + let Some(router) = DomainRouters::::get(domain) else { + return (Err(Error::::RouterNotFound.into()), read_weight); }; - match router.send(sender.clone(), message.serialize()) { - Ok(dispatch_info) => Self::update_total_post_dispatch_info_weight( - &mut weight, - dispatch_info.actual_weight, - ), - Err(e) => { - Self::update_total_post_dispatch_info_weight( - &mut weight, - e.post_info.actual_weight, - ); + let serialized = message.serialize(); + let serialized_len = serialized.len() as u64; - return (Err(e.error), weight); - } - } + // TODO: do we really need to return the weights in send() if later we use the + // defensive ones? + let (result, router_weight) = match router.send(sender, serialized) { + Ok(dispatch_info) => (Ok(()), dispatch_info.actual_weight), + Err(e) => (Err(e.error), e.post_info.actual_weight), + }; - (Ok(()), weight) + ( + result, + router_weight + .unwrap_or(Weight::from_parts(LP_DEFENSIVE_WEIGHT_REF_TIME, 0)) + .saturating_add(read_weight) + .saturating_add(Weight::from_parts(0, serialized_len)), + ) } - /// Updates the provided `PostDispatchInfo` with the weight required to - /// process an outbound message. - pub(crate) fn update_total_post_dispatch_info_weight( - weight: &mut Weight, - router_call_weight: Option, - ) { - let router_call_weight = - Self::get_outbound_message_processing_weight(router_call_weight); - - *weight = weight.saturating_add(router_call_weight); - } + fn queue_message(destination: Domain, message: T::Message) -> DispatchResult { + // We are using the sender specified in the pallet config so that we can + // ensure that the account is funded + let gateway_message = GatewayMessage::::Outbound { + sender: T::Sender::get(), + destination, + message, + }; - /// Calculates the weight used by a router when processing an outbound - /// message. - fn get_outbound_message_processing_weight(router_call_weight: Option) -> Weight { - let pov_weight: u64 = (Domain::max_encoded_len() - + T::AccountId::max_encoded_len() - + T::Message::max_encoded_len()) - .try_into() - .expect("can calculate outbound message POV weight"); - - router_call_weight - .unwrap_or(Weight::from_parts(LP_DEFENSIVE_WEIGHT_REF_TIME, 0)) - .saturating_add(Weight::from_parts(0, pov_weight)) + T::MessageQueue::submit(gateway_message) } } - /// The `OutboundMessageHandler` implementation ensures that outbound - /// messages are queued accordingly. - /// - /// NOTE - the sender provided as an argument is not used at the moment, we - /// are using the sender specified in the pallet config so that we can - /// ensure that the account is funded. impl OutboundMessageHandler for Pallet { type Destination = Domain; type Message = T::Message; type Sender = T::AccountId; fn handle( - _sender: Self::Sender, + from: Self::Sender, destination: Self::Destination, message: Self::Message, ) -> DispatchResult { @@ -620,13 +583,10 @@ pub mod pallet { Error::::DomainNotSupported ); - let gateway_message = GatewayMessage::::Outbound { - sender: T::Sender::get(), - destination, - message, - }; - - T::MessageQueue::submit(gateway_message) + PackedMessage::::mutate((&from, destination.clone()), |batch| match batch { + Some(batch) => batch.pack_with(message), + None => Self::queue_message(destination, message), + }) } } diff --git a/pallets/liquidity-pools-gateway/src/mock.rs b/pallets/liquidity-pools-gateway/src/mock.rs index b74d4b95fe..83d8e9b5da 100644 --- a/pallets/liquidity-pools-gateway/src/mock.rs +++ b/pallets/liquidity-pools-gateway/src/mock.rs @@ -2,12 +2,14 @@ use cfg_mocks::{ pallet_mock_liquidity_pools, pallet_mock_liquidity_pools_gateway_queue, pallet_mock_routers, RouterMock, }; -use cfg_traits::liquidity_pools::test_util::Message; +use cfg_traits::liquidity_pools::LPEncoding; use cfg_types::domain_address::DomainAddress; use frame_support::{derive_impl, weights::constants::RocksDbWeight}; +use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use runtime_common::origin::EnsureAccountOrRoot; +use scale_info::TypeInfo; use sp_core::{crypto::AccountId32, H256}; -use sp_runtime::traits::IdentityLookup; +use sp_runtime::{traits::IdentityLookup, DispatchError, DispatchResult}; use crate::{pallet as pallet_liquidity_pools_gateway, EnsureLocal, GatewayMessage}; @@ -20,6 +22,67 @@ pub const SOURCE_ADDRESS: [u8; LENGTH_SOURCE_ADDRESS] = [0u8; LENGTH_SOURCE_ADDR pub const LP_ADMIN_ACCOUNT: AccountId32 = AccountId32::new([u8::MAX; 32]); +pub const MAX_PACKED_MESSAGES_ERR: &str = "packed limit error"; +pub const MAX_PACKED_MESSAGES: usize = 10; + +#[derive(Default, Debug, Eq, PartialEq, Clone, Encode, Decode, TypeInfo)] +pub enum Message { + #[default] + Simple, + Pack(Vec), +} + +/// Avoiding automatic infinity loop with the MaxEncodedLen derive +impl MaxEncodedLen for Message { + fn max_encoded_len() -> usize { + 4 + MAX_PACKED_MESSAGES + } +} + +impl LPEncoding for Message { + fn serialize(&self) -> Vec { + match self { + Self::Simple => vec![0x42], + Self::Pack(list) => list.iter().map(|_| 0x42).collect(), + } + } + + fn deserialize(input: &[u8]) -> Result { + Ok(match input.len() { + 0 => unimplemented!(), + 1 => Self::Simple, + n => Self::Pack(sp_std::iter::repeat(Self::Simple).take(n).collect()), + }) + } + + fn pack_with(&mut self, other: Self) -> DispatchResult { + match self { + Self::Simple => { + *self = Self::Pack(vec![Self::Simple, other]); + Ok(()) + } + Self::Pack(list) if list.len() == MAX_PACKED_MESSAGES => { + Err(MAX_PACKED_MESSAGES_ERR.into()) + } + Self::Pack(list) => { + list.push(other); + Ok(()) + } + } + } + + fn submessages(&self) -> Vec { + match self { + Self::Simple => vec![Self::Simple], + Self::Pack(list) => list.clone(), + } + } + + fn empty() -> Self { + Self::Pack(vec![]) + } +} + frame_support::construct_runtime!( pub enum Runtime { System: frame_system, diff --git a/pallets/liquidity-pools-gateway/src/tests.rs b/pallets/liquidity-pools-gateway/src/tests.rs index e0c1428548..961c8cb966 100644 --- a/pallets/liquidity-pools-gateway/src/tests.rs +++ b/pallets/liquidity-pools-gateway/src/tests.rs @@ -1,8 +1,6 @@ use cfg_mocks::*; use cfg_primitives::LP_DEFENSIVE_WEIGHT; -use cfg_traits::liquidity_pools::{ - test_util::Message, LPEncoding, MessageProcessor, OutboundMessageHandler, -}; +use cfg_traits::liquidity_pools::{LPEncoding, MessageProcessor, OutboundMessageHandler}; use cfg_types::domain_address::*; use frame_support::{ assert_noop, assert_ok, dispatch::PostDispatchInfo, pallet_prelude::Pays, weights::Weight, @@ -39,97 +37,6 @@ mod utils { use utils::*; -mod pallet_internals { - - use super::*; - - #[test] - fn try_range_fails_if_slice_to_short() { - new_test_ext().execute_with(|| { - let three_bytes = [0u8; 3]; - let steps = 4usize; - - assert_noop!( - Pallet::::try_range( - &mut three_bytes.as_slice(), - steps, - Error::::MessageDecodingFailed, - |_| Ok(()) - ), - Error::::MessageDecodingFailed - ); - }) - } - - #[test] - fn try_range_updates_slice_ref_correctly() { - new_test_ext().execute_with(|| { - let bytes = [1, 2, 3, 4, 5, 6, 7u8]; - let slice = &mut bytes.as_slice(); - let steps = 4; - let first_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |first_section| Ok(first_section), - ) - .expect("Slice is long enough"); - - assert_eq!(first_section, &[1, 2, 3, 4]); - - let steps = 2; - let second_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |second_section| Ok(second_section), - ) - .expect("Slice is long enough"); - - assert_eq!(&second_section, &[5, 6]); - - let steps = 1; - let third_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |third_section| Ok(third_section), - ) - .expect("Slice is long enough"); - - assert_eq!(&third_section, &[7]); - }) - } - - #[test] - fn try_range_does_not_update_slice_if_transformer_errors() { - new_test_ext().execute_with(|| { - let bytes = [1, 2, 3, 4, 5, 6, 7u8]; - let slice = &mut bytes.as_slice(); - let steps = 4; - let first_section = Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |first_section| Ok(first_section), - ) - .expect("Slice is long enough"); - - assert_eq!(first_section, &[1, 2, 3, 4]); - - let steps = 1; - assert!(Pallet::::try_range( - slice, - steps, - Error::::MessageDecodingFailed, - |_| Err::<(), _>(DispatchError::Corruption) - ) - .is_err()); - assert_eq!(slice, &[5, 6, 7]); - }) - } -} - mod set_domain_router { use super::*; @@ -429,7 +336,7 @@ mod remove_instance { } } -mod process_msg_axelar_relay { +mod receive_message_axelar_relay { use sp_core::bounded::BoundedVec; use super::*; @@ -454,10 +361,10 @@ mod process_msg_axelar_relay { )); - let expected_msg = Message; + let expected_msg = Message::Simple; let expected_domain_address = domain_address.clone(); - let inbound_message = GatewayMessage::::Inbound { domain_address: expected_domain_address, message: expected_msg }; + let inbound_message = GatewayMessage::::Inbound { domain_address: expected_domain_address, message: expected_msg.clone() }; MockLiquidityPoolsGatewayQueue::mock_submit(move |mock_message| { assert_eq!(mock_message, inbound_message); @@ -476,9 +383,9 @@ mod process_msg_axelar_relay { }); let solidity_header = "0000000a657468657265756d2d320000002a307838353033623434353242663632333863433736436462454532323362343664373139366231633933"; - let payload = [hex::decode(solidity_header).unwrap(), Message.serialize()].concat(); + let payload = [hex::decode(solidity_header).unwrap(), expected_msg.serialize()].concat(); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(payload).unwrap() )); @@ -491,7 +398,7 @@ mod process_msg_axelar_relay { let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); let domain_address = DomainAddress::EVM(SOURCE_CHAIN_EVM_ID, SOURCE_ADDRESS); let relayer_address = DomainAddress::EVM(0, address.into()); - let message = Message; + let message = Message::Simple; assert_ok!(LiquidityPoolsGateway::add_instance( RuntimeOrigin::root(), domain_address.clone(), @@ -530,7 +437,7 @@ mod process_msg_axelar_relay { msg.extend_from_slice(&SOURCE_ADDRESS); msg.extend_from_slice(&message.serialize()); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() )); @@ -543,7 +450,7 @@ mod process_msg_axelar_relay { let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); let domain_address = DomainAddress::EVM(SOURCE_CHAIN_EVM_ID, SOURCE_ADDRESS); let relayer_address = DomainAddress::EVM(0, address.into()); - let message = Message; + let message = Message::Simple; assert_ok!(LiquidityPoolsGateway::add_instance( RuntimeOrigin::signed(LP_ADMIN_ACCOUNT), @@ -583,7 +490,7 @@ mod process_msg_axelar_relay { msg.extend_from_slice(&SOURCE_ADDRESS); msg.extend_from_slice(&message.serialize()); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() )); @@ -602,7 +509,7 @@ mod process_msg_axelar_relay { relayer_address.clone(), )); - let expected_msg = Message; + let expected_msg = Message::Simple; let mut msg = Vec::new(); @@ -621,7 +528,7 @@ mod process_msg_axelar_relay { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() ), @@ -644,7 +551,7 @@ mod process_msg_axelar_relay { relayer_address.clone(), )); - let expected_msg = Message; + let expected_msg = Message::Simple; let mut msg = Vec::new(); msg.extend_from_slice(&(LENGTH_SOURCE_CHAIN as u32).to_be_bytes()); @@ -665,7 +572,7 @@ mod process_msg_axelar_relay { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::AxelarRelay(relayer_address).into(), BoundedVec::::try_from(msg).unwrap() ), @@ -674,45 +581,6 @@ mod process_msg_axelar_relay { }); } - #[test] - fn message_decode() { - new_test_ext().execute_with(|| { - let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); - let domain_address = DomainAddress::EVM(SOURCE_CHAIN_EVM_ID, SOURCE_ADDRESS); - let relayer_address = DomainAddress::EVM(0, address.into()); - - assert_ok!(LiquidityPoolsGateway::add_instance( - RuntimeOrigin::root(), - domain_address.clone(), - )); - - assert_ok!(LiquidityPoolsGateway::add_relayer( - RuntimeOrigin::root(), - relayer_address.clone(), - )); - - let encoded_msg: Vec = vec![11]; - let expected_domain_address = domain_address.clone(); - - MockOriginRecovery::mock_try_convert(move |origin| { - let (source_chain, source_address) = origin; - - assert_eq!(&source_chain, SOURCE_CHAIN.as_slice()); - assert_eq!(&source_address, SOURCE_ADDRESS.as_slice()); - - Ok(expected_domain_address.clone()) - }); - - assert_noop!( - LiquidityPoolsGateway::process_msg( - GatewayOrigin::Domain(domain_address).into(), - BoundedVec::::try_from(encoded_msg).unwrap() - ), - Error::::MessageDecodingFailed, - ); - }); - } - #[test] fn message_queue_error() { new_test_ext().execute_with(|| { @@ -730,7 +598,7 @@ mod process_msg_axelar_relay { relayer_address.clone(), )); - let expected_msg = Message; + let expected_msg = Message::Simple; let encoded_msg = expected_msg.serialize(); let expected_domain_address = domain_address.clone(); @@ -742,7 +610,7 @@ mod process_msg_axelar_relay { MockLiquidityPoolsGatewayQueue::mock_submit(move |_| Err(err)); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -752,7 +620,7 @@ mod process_msg_axelar_relay { } } -mod process_msg_domain { +mod receive_message_domain { use super::*; #[test] @@ -760,7 +628,7 @@ mod process_msg_domain { new_test_ext().execute_with(|| { let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); let domain_address = DomainAddress::EVM(0, address.into()); - let message = Message; + let message = Message::Simple; assert_ok!(LiquidityPoolsGateway::add_instance( RuntimeOrigin::root(), @@ -779,7 +647,7 @@ mod process_msg_domain { Ok(()) }); - assert_ok!(LiquidityPoolsGateway::process_msg( + assert_ok!(LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() )); @@ -789,10 +657,10 @@ mod process_msg_domain { #[test] fn bad_origin() { new_test_ext().execute_with(|| { - let encoded_msg = Message.serialize(); + let encoded_msg = Message::Simple.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( RuntimeOrigin::root(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -805,10 +673,10 @@ mod process_msg_domain { fn invalid_message_origin() { new_test_ext().execute_with(|| { let domain_address = DomainAddress::Centrifuge(get_test_account_id().into()); - let encoded_msg = Message.serialize(); + let encoded_msg = Message::Simple.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -822,10 +690,10 @@ mod process_msg_domain { new_test_ext().execute_with(|| { let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); let domain_address = DomainAddress::EVM(0, address.into()); - let encoded_msg = Message.serialize(); + let encoded_msg = Message::Simple.serialize(); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -834,35 +702,12 @@ mod process_msg_domain { }); } - #[test] - fn message_decode() { - new_test_ext().execute_with(|| { - let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); - let domain_address = DomainAddress::EVM(0, address.into()); - - assert_ok!(LiquidityPoolsGateway::add_instance( - RuntimeOrigin::root(), - domain_address.clone(), - )); - - let encoded_msg: Vec = vec![11]; - - assert_noop!( - LiquidityPoolsGateway::process_msg( - GatewayOrigin::Domain(domain_address).into(), - BoundedVec::::try_from(encoded_msg).unwrap() - ), - Error::::MessageDecodingFailed, - ); - }); - } - #[test] fn message_queue_error() { new_test_ext().execute_with(|| { let address = H160::from_slice(&get_test_account_id().as_slice()[..20]); let domain_address = DomainAddress::EVM(0, address.into()); - let message = Message; + let message = Message::Simple; assert_ok!(LiquidityPoolsGateway::add_instance( RuntimeOrigin::root(), @@ -884,7 +729,7 @@ mod process_msg_domain { }); assert_noop!( - LiquidityPoolsGateway::process_msg( + LiquidityPoolsGateway::receive_message( GatewayOrigin::Domain(domain_address).into(), BoundedVec::::try_from(encoded_msg).unwrap() ), @@ -902,7 +747,7 @@ mod outbound_message_handler_impl { new_test_ext().execute_with(|| { let domain = Domain::EVM(0); let sender = get_test_account_id(); - let msg = Message; + let msg = Message::Simple; let router = RouterMock::::default(); router.mock_init(move || Ok(())); @@ -934,7 +779,7 @@ mod outbound_message_handler_impl { new_test_ext().execute_with(|| { let domain = Domain::Centrifuge; let sender = get_test_account_id(); - let msg = Message; + let msg = Message::Simple; assert_noop!( LiquidityPoolsGateway::handle(sender, domain, msg), @@ -948,7 +793,7 @@ mod outbound_message_handler_impl { new_test_ext().execute_with(|| { let domain = Domain::EVM(0); let sender = get_test_account_id(); - let msg = Message; + let msg = Message::Simple; let router = RouterMock::::default(); router.mock_init(move || Ok(())); @@ -1050,7 +895,7 @@ mod message_processor_impl { fn success() { new_test_ext().execute_with(|| { let domain_address = DomainAddress::EVM(1, [1; 20]); - let message = Message; + let message = Message::Simple; let gateway_message = GatewayMessage::::Inbound { domain_address: domain_address.clone(), message: message.clone(), @@ -1072,7 +917,7 @@ mod message_processor_impl { fn inbound_message_handler_error() { new_test_ext().execute_with(|| { let domain_address = DomainAddress::EVM(1, [1; 20]); - let message = Message; + let message = Message::Simple; let gateway_message = GatewayMessage::::Inbound { domain_address: domain_address.clone(), message: message.clone(), @@ -1105,7 +950,7 @@ mod message_processor_impl { new_test_ext().execute_with(|| { let sender = get_test_account_id(); let domain = Domain::EVM(1); - let message = Message; + let message = Message::Simple; let expected_sender = sender.clone(); let expected_message = message.clone(); @@ -1125,23 +970,19 @@ mod message_processor_impl { DomainRouters::::insert(domain.clone(), router_mock); - let mut expected_weight = - ::DbWeight::get().reads(1); - - Pallet::::update_total_post_dispatch_info_weight( - &mut expected_weight, - router_post_info.actual_weight, - ); + let min_expected_weight = ::DbWeight::get() + .reads(1) + router_post_info.actual_weight.unwrap() + + Weight::from_parts(0, message.serialize().len() as u64); let gateway_message = GatewayMessage::::Outbound { sender, destination: domain, - message, + message: message.clone(), }; let (res, weight) = LiquidityPoolsGateway::process(gateway_message); assert_ok!(res); - assert_eq!(weight, expected_weight); + assert!(weight.all_lte(min_expected_weight)); }); } @@ -1150,7 +991,7 @@ mod message_processor_impl { new_test_ext().execute_with(|| { let sender = get_test_account_id(); let domain = Domain::EVM(1); - let message = Message; + let message = Message::Simple; let expected_weight = ::DbWeight::get().reads(1); @@ -1171,7 +1012,7 @@ mod message_processor_impl { new_test_ext().execute_with(|| { let sender = get_test_account_id(); let domain = Domain::EVM(1); - let message = Message; + let message = Message::Simple; let expected_sender = sender.clone(); let expected_message = message.clone(); @@ -1196,24 +1037,24 @@ mod message_processor_impl { DomainRouters::::insert(domain.clone(), router_mock); - let mut expected_weight = - ::DbWeight::get().reads(1); - - Pallet::::update_total_post_dispatch_info_weight( - &mut expected_weight, - router_post_info.actual_weight, - ); + let min_expected_weight = ::DbWeight::get() + .reads(1) + router_post_info.actual_weight.unwrap() + + Weight::from_parts(0, message.serialize().len() as u64); let gateway_message = GatewayMessage::::Outbound { sender, destination: domain, - message, + message: message.clone(), }; let (res, weight) = LiquidityPoolsGateway::process(gateway_message); assert_noop!(res, router_err); - assert_eq!(weight, expected_weight); + assert!(weight.all_lte(min_expected_weight)); }); } } } + +mod packed { + //TODO +} diff --git a/pallets/liquidity-pools-gateway/src/weights.rs b/pallets/liquidity-pools-gateway/src/weights.rs index d72e6e0f91..32109817e5 100644 --- a/pallets/liquidity-pools-gateway/src/weights.rs +++ b/pallets/liquidity-pools-gateway/src/weights.rs @@ -18,9 +18,11 @@ pub trait WeightInfo { fn remove_instance() -> Weight; fn add_relayer() -> Weight; fn remove_relayer() -> Weight; - fn process_msg() -> Weight; + fn receive_message() -> Weight; fn process_outbound_message() -> Weight; fn process_failed_outbound_message() -> Weight; + fn start_pack_messages() -> Weight; + fn end_pack_messages() -> Weight; } // NOTE: We use temporary weights here. `execute_epoch` is by far our heaviest @@ -84,7 +86,7 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().writes(1)) } - fn process_msg() -> Weight { + fn receive_message() -> Weight { // NOTE: Defensive hardcoded weight taken from pool_system::execute_epoch. Will // be replaced with real benchmark soon. // @@ -122,4 +124,26 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().reads(2)) .saturating_add(RocksDbWeight::get().writes(1)) } + + fn start_pack_messages() -> Weight { + // TODO: BENCHMARK CORRECTLY + // + // NOTE: Reasonable weight taken from `PoolSystem::set_max_reserve` + // This one has one read and one write for sure and possible one + // read for `AdminOrigin` + Weight::from_parts(30_117_000, 5991) + .saturating_add(RocksDbWeight::get().reads(1)) + .saturating_add(RocksDbWeight::get().writes(1)) + } + + fn end_pack_messages() -> Weight { + // TODO: BENCHMARK CORRECTLY + // + // NOTE: Reasonable weight taken from `PoolSystem::set_max_reserve` + // This one has one read and one write for sure and possible one + // read for `AdminOrigin` + Weight::from_parts(30_117_000, 5991) + .saturating_add(RocksDbWeight::get().reads(2)) + .saturating_add(RocksDbWeight::get().writes(2)) + } } diff --git a/pallets/liquidity-pools/src/lib.rs b/pallets/liquidity-pools/src/lib.rs index 8e55c470c3..3f047f923a 100644 --- a/pallets/liquidity-pools/src/lib.rs +++ b/pallets/liquidity-pools/src/lib.rs @@ -339,6 +339,9 @@ pub mod pallet { NotPoolAdmin, /// The domain hook address could not be found. DomainHookAddressNotFound, + /// This pallet does not expect to receive direclty a batch message, + /// instead it expects several calls to it with different messages. + UnsupportedBatchMessage, } #[pallet::call] @@ -1029,6 +1032,7 @@ pub mod pallet { currency.into(), sender, ), + Message::Batch(_) => Err(Error::::UnsupportedBatchMessage.into()), _ => Err(Error::::InvalidIncomingMessage.into()), }?; diff --git a/pallets/liquidity-pools/src/message.rs b/pallets/liquidity-pools/src/message.rs index fc917eee29..d981abfb16 100644 --- a/pallets/liquidity-pools/src/message.rs +++ b/pallets/liquidity-pools/src/message.rs @@ -529,35 +529,34 @@ pub enum Message { }, } -impl Message { - /// Compose this message with a new one - pub fn pack(&self, other: Self) -> Result { - Ok(match self.clone() { - Message::Batch(content) => { - let mut content = content.clone(); - content.try_add(other)?; - Message::Batch(content) +impl LPEncoding for Message { + fn serialize(&self) -> Vec { + gmpf::to_vec(self).unwrap_or_default() + } + + fn deserialize(data: &[u8]) -> Result { + gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue")) + } + + fn pack_with(&mut self, other: Self) -> Result<(), DispatchError> { + match self { + Message::Batch(content) => content.try_add(other), + this => { + *this = Message::Batch(BatchMessages::try_from(vec![this.clone(), other])?); + Ok(()) } - this => Message::Batch(BatchMessages::try_from(vec![this.clone(), other])?), - }) + } } - /// Decompose the message into a list of messages - pub fn unpack(&self) -> Vec { + fn submessages(&self) -> Vec { match self { Message::Batch(content) => content.clone().into_iter().collect(), message => vec![message.clone()], } } -} -impl LPEncoding for Message { - fn serialize(&self) -> Vec { - gmpf::to_vec(self).unwrap_or_default() - } - - fn deserialize(data: &[u8]) -> Result { - gmpf::from_slice(data).map_err(|_| DispatchError::Other("LP Deserialization issue")) + fn empty() -> Message { + Message::Batch(BatchMessages::default()) } } diff --git a/runtime/common/src/xcm.rs b/runtime/common/src/xcm.rs index f95111acbd..f41382e91b 100644 --- a/runtime/common/src/xcm.rs +++ b/runtime/common/src/xcm.rs @@ -276,9 +276,9 @@ mod test { pallet_mock_liquidity_pools, pallet_mock_liquidity_pools_gateway_queue, pallet_mock_routers, RouterMock, }; - use cfg_traits::liquidity_pools::test_util::Message; use frame_support::{assert_ok, derive_impl, traits::EnsureOrigin}; use frame_system::EnsureRoot; + use pallet_liquidity_pools::Message; use pallet_liquidity_pools_gateway::{message::GatewayMessage, EnsureLocal, GatewayOrigin}; use sp_core::{ConstU32, ConstU64}; use sp_runtime::DispatchError;