Skip to content

Commit

Permalink
Place a buffer over each sink of a demux to avoid serial message send…
Browse files Browse the repository at this point in the history
…ing (#575)
  • Loading branch information
shadaj authored Apr 13, 2023
1 parent c4f3f97 commit a26f759
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions hydroflow_cli_integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use bytes::{Bytes, BytesMut};
use serde::{Deserialize, Serialize};

use futures::{ready, stream, Sink, Stream};
use futures::{ready, sink::Buffer, stream, Sink, SinkExt, Stream};

use async_recursion::async_recursion;
use async_trait::async_trait;
Expand Down Expand Up @@ -395,9 +395,14 @@ impl ConnectedSink for ConnectedBidi {
}
}

pub struct ConnectedDemux<T: ConnectedSink> {
pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;

pub struct ConnectedDemux<T: ConnectedSink>
where
<T as ConnectedSink>::Input: Sync,
{
pub keys: Vec<u32>,
sink: Option<DemuxDrain<T::Input, T::Sink>>,
sink: Option<BufferedDrain<T::Sink, T::Input>>,
}

#[pin_project]
Expand Down Expand Up @@ -460,7 +465,12 @@ where
for (id, pipe) in demux {
connected_demux.insert(
id,
Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink()),
Box::pin(
T::from_defn(ServerOrBound::Server(pipe))
.await
.into_sink()
.buffer(1024),
),
);
}

Expand All @@ -481,7 +491,12 @@ where
for (id, bound) in demux {
connected_demux.insert(
id,
Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink()),
Box::pin(
T::from_defn(ServerOrBound::Bound(bound))
.await
.into_sink()
.buffer(1024),
),
);
}

Expand All @@ -505,7 +520,7 @@ where
<T as ConnectedSink>::Input: 'static + Sync,
{
type Input = (u32, T::Input);
type Sink = DemuxDrain<T::Input, T::Sink>;
type Sink = BufferedDrain<T::Sink, T::Input>;

fn into_sink(mut self) -> Self::Sink {
self.sink.take().unwrap()
Expand Down

0 comments on commit a26f759

Please sign in to comment.