Skip to content

Commit

Permalink
fix(batch): Stop polling after data stream returns None (risingwave…
Browse files Browse the repository at this point in the history
…labs#4371)

* fix(batch): Stop polling after data stream returns None

* Fix comments
  • Loading branch information
liurenjie1024 authored and nasnoisaac committed Aug 9, 2022
1 parent 5f046c7 commit 1745254
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 37 deletions.
9 changes: 9 additions & 0 deletions src/batch/src/task/broadcast_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::future::Future;

use risingwave_common::array::DataChunk;
Expand All @@ -33,6 +34,14 @@ pub struct BroadcastSender {
broadcast_info: BroadcastInfo,
}

impl Debug for BroadcastSender {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BroadcastSender")
.field("broadcast_info", &self.broadcast_info)
.finish()
}
}

impl ChanSender for BroadcastSender {
type SendFuture<'a> = impl Future<Output = BatchResult<()>>;

Expand Down
1 change: 1 addition & 0 deletions src/batch/src/task/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub(super) trait ChanSender: Send {
fn send(&mut self, chunk: Option<DataChunk>) -> Self::SendFuture<'_>;
}

#[derive(Debug)]
pub enum ChanSenderImpl {
HashShuffle(HashShuffleSender),
Fifo(FifoSender),
Expand Down
10 changes: 8 additions & 2 deletions src/batch/src/task/fifo_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::future::Future;

use risingwave_common::array::DataChunk;
Expand All @@ -24,11 +25,16 @@ use crate::error::Result as BatchResult;
use crate::task::channel::{ChanReceiver, ChanReceiverImpl, ChanSender, ChanSenderImpl};
use crate::task::data_chunk_in_channel::DataChunkInChannel;
use crate::task::BOUNDED_BUFFER_SIZE;

pub struct FifoSender {
sender: mpsc::Sender<Option<DataChunkInChannel>>,
}

impl Debug for FifoSender {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FifoSender").finish()
}
}

pub struct FifoReceiver {
receiver: mpsc::Receiver<Option<DataChunkInChannel>>,
}
Expand All @@ -37,7 +43,7 @@ impl ChanSender for FifoSender {
type SendFuture<'a> = impl Future<Output = BatchResult<()>>;

fn send(&mut self, chunk: Option<DataChunk>) -> Self::SendFuture<'_> {
async move {
async {
self.sender
.send(chunk.map(DataChunkInChannel::new))
.await
Expand Down
11 changes: 10 additions & 1 deletion src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::ops::BitAnd;
use std::option::Option;
Expand All @@ -33,7 +34,15 @@ use crate::task::BOUNDED_BUFFER_SIZE;

pub struct HashShuffleSender {
senders: Vec<mpsc::Sender<Option<DataChunkInChannel>>>,
hash_info: exchange_info::HashInfo,
hash_info: HashInfo,
}

impl Debug for HashShuffleSender {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HashShuffleSender")
.field("hash_info", &self.hash_info)
.finish()
}
}

pub struct HashShuffleReceiver {
Expand Down
68 changes: 34 additions & 34 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,54 +288,54 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
mut shutdown_rx: Receiver<u64>,
) -> Result<()> {
let mut data_chunk_stream = root.execute();
let mut state = TaskStatus::Unspecified;
loop {
tokio::select! {
// We prioritize abort signal over normal data chunks.
biased;
_ = &mut shutdown_rx => {
if let Err(e) = sender.send(None).await {
match e {
BatchError::SenderError => {
// This is possible since when we have limit executor in parent
// stage, it may early stop receiving data from downstream, which
// leads to close of channel.
warn!("Task receiver closed!");
break;
},
x => {
return Err(InternalError(format!("Failed to send data: {:?}", x)))?;
}
}
}
*self.state.lock() = TaskStatus::Aborted;
state = TaskStatus::Aborted;
break;
}
res = data_chunk_stream.next() => {
let data_chunk = match res {
Some(data_chunk) => Some(data_chunk?),
None => {
trace!("data chunk stream shuts down");
None
}
};

if let Err(e) = sender.send(data_chunk).await {
match e {
BatchError::SenderError => {
// This is possible since when we have limit executor in parent
// stage, it may early stop receiving data from downstream, which
// leads to close of channel.
warn!("Task receiver closed!");
break;
},
x => {
return Err(InternalError(format!("Failed to send data: {:?}", x)))?;
if let Some(data_chunk) = res {
if let Err(e) = sender.send(Some(data_chunk?)).await {
match e {
BatchError::SenderError => {
// This is possible since when we have limit executor in parent
// stage, it may early stop receiving data from downstream, which
// leads to close of channel.
warn!("Task receiver closed!");
break;
},
x => {
return Err(InternalError(format!("Failed to send data: {:?}", x)))?;
}
}
}
} else {
state = TaskStatus::Finished;
break;
}
}
}
}

info!("Task finished with status: {:?}", state);
*self.state.lock() = state;
if let Err(e) = sender.send(None).await {
match e {
BatchError::SenderError => {
// This is possible since when we have limit executor in parent
// stage, it may early stop receiving data from downstream, which
// leads to close of channel.
warn!("Task receiver closed when sending None!");
}
x => {
return Err(InternalError(format!("Failed to send data: {:?}", x)))?;
}
}
}
Ok(())
}

Expand Down

0 comments on commit 1745254

Please sign in to comment.