Skip to content

Commit

Permalink
Merge pull request #2354 from sundy-li/progress_table_stream
Browse files Browse the repository at this point in the history
ISSUE-2353 wrap progress stream in TransformSource
  • Loading branch information
BohuTANG authored Oct 20, 2021
2 parents 67ab365 + d3f9b47 commit 8107725
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 24 deletions.
5 changes: 0 additions & 5 deletions query/src/datasources/table/fuse/table_do_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use common_context::IOContext;
use common_context::TableIOContext;
use common_exception::Result;
use common_planners::Extras;
use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;
use futures::StreamExt;

Expand Down Expand Up @@ -57,7 +56,6 @@ impl FuseTable {
// TODO we need a configuration to specify the unit of dequeue operation
let bite_size = 1;
let iter = {
let ctx = ctx.clone();
std::iter::from_fn(move || match ctx.clone().try_get_partitions(bite_size) {
Err(_) => None,
Ok(parts) if parts.is_empty() => None,
Expand All @@ -72,9 +70,6 @@ impl FuseTable {
let stream = stream.then(move |part| {
io::do_read(part, da.clone(), projection.clone(), arrow_schema.clone())
});

let progress_callback = ctx.progress_callback()?;
let stream = ProgressStream::try_create(Box::pin(stream), progress_callback)?;
Ok(Box::pin(stream))
}
}
13 changes: 4 additions & 9 deletions query/src/datasources/table/memory/memory_table_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::usize;

use common_datablocks::DataBlock;
use common_exception::Result;
use common_streams::ProgressStream;
use futures::stream::Stream;

use crate::sessions::DatabendQueryContextRef;
Expand All @@ -37,17 +36,13 @@ pub struct MemoryTableStream {
}

impl MemoryTableStream {
pub fn try_create(
ctx: DatabendQueryContextRef,
blocks: Vec<DataBlock>,
) -> Result<ProgressStream> {
let stream = Box::pin(MemoryTableStream {
ctx: ctx.clone(),
pub fn try_create(ctx: DatabendQueryContextRef, blocks: Vec<DataBlock>) -> Result<Self> {
Ok(Self {
ctx,
block_index: 0,
block_ranges: vec![],
blocks,
});
ProgressStream::try_create(stream, ctx.progress_callback()?)
})
}

fn try_get_one_block(&mut self) -> Result<Option<DataBlock>> {
Expand Down
13 changes: 4 additions & 9 deletions query/src/datasources/table_func/numbers_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::usize;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::Result;
use common_streams::ProgressStream;
use futures::stream::Stream;

use crate::sessions::DatabendQueryContextRef;
Expand All @@ -38,17 +37,13 @@ pub struct NumbersStream {
}

impl NumbersStream {
pub fn try_create(
ctx: DatabendQueryContextRef,
schema: DataSchemaRef,
) -> Result<ProgressStream> {
let stream = Box::pin(NumbersStream {
ctx: ctx.clone(),
pub fn try_create(ctx: DatabendQueryContextRef, schema: DataSchemaRef) -> Result<Self> {
Ok(Self {
ctx,
schema,
block_index: 0,
blocks: vec![],
});
ProgressStream::try_create(stream, ctx.progress_callback()?)
})
}

#[inline]
Expand Down
6 changes: 5 additions & 1 deletion query/src/pipelines/transforms/transform_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_exception::ErrorCode;
use common_exception::Result;
use common_planners::ReadDataSourcePlan;
use common_streams::CorrectWithSchemaStream;
use common_streams::ProgressStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;

Expand Down Expand Up @@ -55,8 +56,11 @@ impl SourceTransform {
// get_cluster_table_io_context()?
let io_ctx = Arc::new(self.ctx.get_cluster_table_io_context()?);
let table_stream = table.read(io_ctx, &self.source_plan.push_downs);
let progress_stream =
ProgressStream::try_create(table_stream.await?, self.ctx.progress_callback()?)?;

Ok(Box::pin(
self.ctx.try_create_abortable(table_stream.await?)?,
self.ctx.try_create_abortable(Box::pin(progress_stream))?,
))
}
}
Expand Down

0 comments on commit 8107725

Please sign in to comment.