diff --git a/hydroflow_cli_integration/src/lib.rs b/hydroflow_cli_integration/src/lib.rs index 80523dca51ab..7c27b4a3cc8b 100644 --- a/hydroflow_cli_integration/src/lib.rs +++ b/hydroflow_cli_integration/src/lib.rs @@ -12,7 +12,7 @@ use std::{ use bytes::{Bytes, BytesMut}; use serde::{Deserialize, Serialize}; -use futures::{ready, stream, Sink, Stream}; +use futures::{ready, sink::Buffer, stream, Sink, SinkExt, Stream}; use async_recursion::async_recursion; use async_trait::async_trait; @@ -395,9 +395,14 @@ impl ConnectedSink for ConnectedBidi { } } -pub struct ConnectedDemux { +pub type BufferedDrain = DemuxDrain>; + +pub struct ConnectedDemux +where + ::Input: Sync, +{ pub keys: Vec, - sink: Option>, + sink: Option>, } #[pin_project] @@ -460,7 +465,12 @@ where for (id, pipe) in demux { connected_demux.insert( id, - Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink()), + Box::pin( + T::from_defn(ServerOrBound::Server(pipe)) + .await + .into_sink() + .buffer(1024), + ), ); } @@ -481,7 +491,12 @@ where for (id, bound) in demux { connected_demux.insert( id, - Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink()), + Box::pin( + T::from_defn(ServerOrBound::Bound(bound)) + .await + .into_sink() + .buffer(1024), + ), ); } @@ -505,7 +520,7 @@ where ::Input: 'static + Sync, { type Input = (u32, T::Input); - type Sink = DemuxDrain; + type Sink = BufferedDrain; fn into_sink(mut self) -> Self::Sink { self.sink.take().unwrap()