diff --git a/query/src/datasources/table/fuse/table_do_read.rs b/query/src/datasources/table/fuse/table_do_read.rs index b856641d68a2c..573813a00d176 100644 --- a/query/src/datasources/table/fuse/table_do_read.rs +++ b/query/src/datasources/table/fuse/table_do_read.rs @@ -19,7 +19,6 @@ use common_context::IOContext; use common_context::TableIOContext; use common_exception::Result; use common_planners::Extras; -use common_streams::ProgressStream; use common_streams::SendableDataBlockStream; use futures::StreamExt; @@ -57,7 +56,6 @@ impl FuseTable { // TODO we need a configuration to specify the unit of dequeue operation let bite_size = 1; let iter = { - let ctx = ctx.clone(); std::iter::from_fn(move || match ctx.clone().try_get_partitions(bite_size) { Err(_) => None, Ok(parts) if parts.is_empty() => None, @@ -72,9 +70,6 @@ impl FuseTable { let stream = stream.then(move |part| { io::do_read(part, da.clone(), projection.clone(), arrow_schema.clone()) }); - - let progress_callback = ctx.progress_callback()?; - let stream = ProgressStream::try_create(Box::pin(stream), progress_callback)?; Ok(Box::pin(stream)) } } diff --git a/query/src/datasources/table/memory/memory_table_stream.rs b/query/src/datasources/table/memory/memory_table_stream.rs index 185f7c6fda977..0fd89f2d8b504 100644 --- a/query/src/datasources/table/memory/memory_table_stream.rs +++ b/query/src/datasources/table/memory/memory_table_stream.rs @@ -18,7 +18,6 @@ use std::usize; use common_datablocks::DataBlock; use common_exception::Result; -use common_streams::ProgressStream; use futures::stream::Stream; use crate::sessions::DatabendQueryContextRef; @@ -37,17 +36,13 @@ pub struct MemoryTableStream { } impl MemoryTableStream { - pub fn try_create( - ctx: DatabendQueryContextRef, - blocks: Vec, - ) -> Result { - let stream = Box::pin(MemoryTableStream { - ctx: ctx.clone(), + pub fn try_create(ctx: DatabendQueryContextRef, blocks: Vec) -> Result { + Ok(Self { + ctx, block_index: 0, block_ranges: vec![], blocks, - }); - ProgressStream::try_create(stream, ctx.progress_callback()?) + }) } fn try_get_one_block(&mut self) -> Result> { diff --git a/query/src/datasources/table_func/numbers_stream.rs b/query/src/datasources/table_func/numbers_stream.rs index 66972595f15ea..d6e0973effd9d 100644 --- a/query/src/datasources/table_func/numbers_stream.rs +++ b/query/src/datasources/table_func/numbers_stream.rs @@ -19,7 +19,6 @@ use std::usize; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_streams::ProgressStream; use futures::stream::Stream; use crate::sessions::DatabendQueryContextRef; @@ -38,17 +37,13 @@ pub struct NumbersStream { } impl NumbersStream { - pub fn try_create( - ctx: DatabendQueryContextRef, - schema: DataSchemaRef, - ) -> Result { - let stream = Box::pin(NumbersStream { - ctx: ctx.clone(), + pub fn try_create(ctx: DatabendQueryContextRef, schema: DataSchemaRef) -> Result { + Ok(Self { + ctx, schema, block_index: 0, blocks: vec![], - }); - ProgressStream::try_create(stream, ctx.progress_callback()?) + }) } #[inline] diff --git a/query/src/pipelines/transforms/transform_source.rs b/query/src/pipelines/transforms/transform_source.rs index 6a21c676e7e39..7c9a9d373df1c 100644 --- a/query/src/pipelines/transforms/transform_source.rs +++ b/query/src/pipelines/transforms/transform_source.rs @@ -19,6 +19,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_planners::ReadDataSourcePlan; use common_streams::CorrectWithSchemaStream; +use common_streams::ProgressStream; use common_streams::SendableDataBlockStream; use common_tracing::tracing; @@ -55,8 +56,11 @@ impl SourceTransform { // get_cluster_table_io_context()? let io_ctx = Arc::new(self.ctx.get_cluster_table_io_context()?); let table_stream = table.read(io_ctx, &self.source_plan.push_downs); + let progress_stream = + ProgressStream::try_create(table_stream.await?, self.ctx.progress_callback()?)?; + Ok(Box::pin( - self.ctx.try_create_abortable(table_stream.await?)?, + self.ctx.try_create_abortable(Box::pin(progress_stream))?, )) } }