Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: drop channel if partition receiver finished #17037

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion src/query/pipeline/sources/src/async_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ pub trait AsyncSource: Send {
fn un_reacted(&self) -> Result<()> {
Ok(())
}

#[async_backtrace::framed]
async fn on_finish(&mut self) -> Result<()> {
Ok(())
}
}

// TODO: This can be refactored using proc macros
Expand All @@ -50,6 +55,7 @@ pub struct AsyncSourcer<T: 'static + AsyncSource> {
output: Arc<OutputPort>,
scan_progress: Arc<Progress>,
generated_data: Option<DataBlock>,
called_on_finish: bool,
}

impl<T: 'static + AsyncSource> AsyncSourcer<T> {
Expand All @@ -65,6 +71,7 @@ impl<T: 'static + AsyncSource> AsyncSourcer<T> {
scan_progress,
is_finish: false,
generated_data: None,
called_on_finish: false,
})))
}
}
Expand All @@ -81,12 +88,16 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {

fn event(&mut self) -> Result<Event> {
if self.is_finish {
if !self.called_on_finish {
return Ok(Event::Async);
}
self.output.finish();
return Ok(Event::Finished);
}

if self.output.is_finished() {
return Ok(Event::Finished);
self.is_finish = true;
return Ok(Event::Async);
}

if !self.output.can_push() {
Expand All @@ -112,6 +123,13 @@ impl<T: 'static + AsyncSource> Processor for AsyncSourcer<T> {

#[async_backtrace::framed]
async fn async_process(&mut self) -> Result<()> {
if self.is_finish {
if !self.called_on_finish {
self.called_on_finish = true;
self.inner.on_finish().await?;
}
return Ok(());
}
match self.inner.generate().await? {
None => self.is_finish = true,
Some(data_block) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use databend_common_pipeline_sources::AsyncSourcer;
use crate::operations::read::block_partition_meta::BlockPartitionMeta;

pub struct BlockPartitionReceiverSource {
pub meta_receiver: Receiver<Result<PartInfoPtr>>,
pub meta_receiver: Option<Receiver<Result<PartInfoPtr>>>,
}

impl BlockPartitionReceiverSource {
Expand All @@ -37,7 +37,7 @@ impl BlockPartitionReceiverSource {
output_port: Arc<OutputPort>,
) -> Result<ProcessorPtr> {
AsyncSourcer::create(ctx, output_port, Self {
meta_receiver: receiver,
meta_receiver: Some(receiver),
})
}
}
Expand All @@ -49,18 +49,28 @@ impl AsyncSource for BlockPartitionReceiverSource {

#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
if let Some(rx) = &self.meta_receiver {
match rx.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
),
Err(_) => {
// The channel is closed, we should return None to stop generating
Ok(None)
}
}
} else {
Ok(None)
}
}

#[async_backtrace::framed]
async fn on_finish(&mut self) -> Result<()> {
drop(self.meta_receiver.take());
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,9 @@ impl AsyncSink for SendPartInfoSink {

for info in info_ptr {
if let Some(sender) = &self.sender {
let _ = sender.send(Ok(info)).await;
if let Err(_e) = sender.send(Ok(info)).await {
break;
}
}
}

Expand Down
Loading