-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Use async/await in the NetworkWorker and in build_network_future #5704
Conversation
All tests have passed on the first try, and the node I'm testing this on seems to be behaving properly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me.
I have a suggestion in regards to the StatusSinks
structure. Hope I am not missing something.
@@ -14,19 +14,27 @@ | |||
// You should have received a copy of the GNU General Public License |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about following the futures::sink::Sink
pattern for StatusSinks
.
StatusSinks
would have two states, initialized with the latter:
-
Ready(YieldAfter)
-
Pending
A user would
-
Call
poll_ready
. In case a outbound sender's interval has fired it would move theStatusSinks
to statusReady(YieldAfter)
and returnPoll::Ready()
. -
Call
start_send
. It would take theYieldAfter
from theReady
status, send the status down the channel and move theStatusSink
status back toPending
.
I would find this easier to parse, not because the current implementation is complex or complicated, but because futures::sink::Sink
is a known familiar pattern. As a nice-to-have it would relief us from the entries_{tx,rx}
channel. If I am not mistaken none of this is performance critical, thus I don't understand the pattern of only locking inner
within next
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand the pattern of only locking inner within next.
The reason is that we are sharing an Arc<StatusSinks>
throughout in the networking code.
Calling next()
returns a Future
that holds a lock to the queue until one element is ready. If we locked in push
as well, then as long as next
's Future
is alive (which is 99% of the time), then calling status_sink.push()
would need to wait.
There is no way you can get rid of the entries_tx/rx
channel without making the NetworkService::event_stream()
function blocking or asynchronous
(except by replacing it by something vaguely equivalent to a channel, like a Vec plus a task notifier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for the Sink
suggestion, we can't really do that.
We call status_sink::next()
in the NetworkWorker::next_action
async method. But if something else is ready (like the import queue, or a network message) then we drop the Future
returned by next()
and re-acquire it the next time next_action
is called.
If we were to turn that into a Sink
, then destroying that future would drop the element that we are trying to send.
It also means that the element that we send might already be obsolete when we actually send it.
if let Some(v) = entries.next().await { | ||
v | ||
} else { | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully not a dumb question: What is this loop
for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not actually sure whether this loop is necessary. I think pending!
is like std::thread::yield_now
and can return.
The fact that this CI test failed and I've never seen it fail before unfortunately doesn't inspire confidence. |
Putting on ice as per previous comment. |
Closing this for the same reason as #5763 |
Work towards paritytech/polkadot-sdk#557
Refactors the
NetworkWorker
andbuild_network_future
functions to use async/await.First of all, I've had to do three side-changes:
SmartImportQueue
to bypass its weird API which isn't really usable from async/await.status_sink
no longer has methods that require&mut self
. Instead it uses a channel to add elements on the queue, and an internalMutex
to protect the queue. That mutex is a futures-mutex and is only locked when we're polling for new events. In other words, if we callnext()
twice, only one will run at a time, but that's a very desired behaviour.NetworkWorker::from_worker
channel has been properly renamed tofrom_service
.The rest of the PR converts the
NetworkWorker::poll()
function into aNetworkWorker::next_action
async method whose role is to process the network worker, and turnsbuild_network_future
into an async function.These two big refactors were actually very straight-forward, as I just replaced the various polling with branches of the
futures::select!
macro.