From 2683bb79671fcb92ce860bdcc607c9a1fd8c2446 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Fri, 17 Mar 2023 14:45:01 +0100 Subject: [PATCH] dmp-queue: Store messages if already processed more than the maximum (#2343) * dmp-queue: Store messages if already processed more than the maximum * Put new event at the end --- pallets/dmp-queue/src/lib.rs | 120 +++++++++++++++++++++++------------ 1 file changed, 81 insertions(+), 39 deletions(-) diff --git a/pallets/dmp-queue/src/lib.rs b/pallets/dmp-queue/src/lib.rs index 9bdf786ba63..083423b5b62 100644 --- a/pallets/dmp-queue/src/lib.rs +++ b/pallets/dmp-queue/src/lib.rs @@ -190,6 +190,8 @@ pub mod pallet { }, /// Downward message from the overweight queue was executed. OverweightServiced { overweight_index: OverweightIndex, weight_used: Weight }, + /// The maximum number of downward messages was. + MaxMessagesExhausted { message_id: MessageId }, } impl Pallet { @@ -307,46 +309,53 @@ pub mod pallet { }; for (i, (sent_at, data)) in iter.enumerate() { - if messages_processed >= MAX_MESSAGES_PER_BLOCK { - break - } if maybe_enqueue_page.is_none() { - // We're not currently enqueuing - try to execute inline. - let remaining_weight = limit.saturating_sub(used); - messages_processed += 1; - match Self::try_service_message(remaining_weight, sent_at, &data[..]) { - Ok(consumed) => used += consumed, - Err((message_id, required_weight)) => - // Too much weight required right now. - { - let is_under_limit = Overweight::::count() < MAX_OVERWEIGHT_MESSAGES; - used.saturating_accrue(T::DbWeight::get().reads(1)); - if required_weight.any_gt(config.max_individual) && is_under_limit { - // overweight - add to overweight queue and continue with - // message execution. - let overweight_index = page_index.overweight_count; - Overweight::::insert(overweight_index, (sent_at, data)); - Self::deposit_event(Event::OverweightEnqueued { - message_id, - overweight_index, - required_weight, - }); - page_index.overweight_count += 1; - // Not needed for control flow, but only to ensure that the compiler - // understands that we won't attempt to re-use `data` later. - continue - } else { - // not overweight. stop executing inline and enqueue normally - // from here on. - let item_count_left = item_count.saturating_sub(i); - maybe_enqueue_page = Some(Vec::with_capacity(item_count_left)); - Self::deposit_event(Event::WeightExhausted { - message_id, - remaining_weight, - required_weight, - }); - } - }, + if messages_processed >= MAX_MESSAGES_PER_BLOCK { + let item_count_left = item_count.saturating_sub(i); + maybe_enqueue_page = Some(Vec::with_capacity(item_count_left)); + + Self::deposit_event(Event::MaxMessagesExhausted { + message_id: sp_io::hashing::blake2_256(&data), + }); + } else { + // We're not currently enqueuing - try to execute inline. + let remaining_weight = limit.saturating_sub(used); + messages_processed += 1; + match Self::try_service_message(remaining_weight, sent_at, &data[..]) { + Ok(consumed) => used += consumed, + Err((message_id, required_weight)) => + // Too much weight required right now. + { + let is_under_limit = + Overweight::::count() < MAX_OVERWEIGHT_MESSAGES; + used.saturating_accrue(T::DbWeight::get().reads(1)); + if required_weight.any_gt(config.max_individual) && is_under_limit { + // overweight - add to overweight queue and continue with + // message execution. + let overweight_index = page_index.overweight_count; + Overweight::::insert(overweight_index, (sent_at, data)); + Self::deposit_event(Event::OverweightEnqueued { + message_id, + overweight_index, + required_weight, + }); + page_index.overweight_count += 1; + // Not needed for control flow, but only to ensure that the compiler + // understands that we won't attempt to re-use `data` later. + continue + } else { + // not overweight. stop executing inline and enqueue normally + // from here on. + let item_count_left = item_count.saturating_sub(i); + maybe_enqueue_page = Some(Vec::with_capacity(item_count_left)); + Self::deposit_event(Event::WeightExhausted { + message_id, + remaining_weight, + required_weight, + }); + } + }, + } } } // Cannot be an `else` here since the `maybe_enqueue_page` may have changed. @@ -889,4 +898,37 @@ mod tests { assert_eq!(pages_queued(), 1); }); } + + #[test] + fn handle_max_messages_per_block() { + new_test_ext().execute_with(|| { + enqueue(&vec![msg(1000), msg(1001)]); + enqueue(&vec![msg(1002), msg(1003)]); + enqueue(&vec![msg(1004), msg(1005)]); + + let incoming = (0..MAX_MESSAGES_PER_BLOCK) + .into_iter() + .map(|i| msg(1006 + i as u64)) + .collect::>(); + handle_messages(&incoming, Weight::from_parts(25000, 25000)); + + assert_eq!( + take_trace(), + (0..MAX_MESSAGES_PER_BLOCK) + .into_iter() + .map(|i| msg_complete(1000 + i as u64)) + .collect::>(), + ); + assert_eq!(pages_queued(), 1); + + handle_messages(&[], Weight::from_parts(25000, 25000)); + assert_eq!( + take_trace(), + (MAX_MESSAGES_PER_BLOCK..MAX_MESSAGES_PER_BLOCK + 6) + .into_iter() + .map(|i| msg_complete(1000 + i as u64)) + .collect::>(), + ); + }); + } }