From 7b1d88ecfe1553e9ff5fb0b474a25d4f85cbf2b4 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Oct 2021 22:57:34 +0800 Subject: [PATCH 1/2] ISSUE-2353 wrap progress stream in TransformSource --- query/src/datasources/table/fuse/table_do_read.rs | 4 ---- .../datasources/table/memory/memory_table_stream.rs | 11 +++-------- query/src/datasources/table_func/numbers_stream.rs | 11 +++-------- query/src/pipelines/transforms/transform_source.rs | 6 +++++- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/query/src/datasources/table/fuse/table_do_read.rs b/query/src/datasources/table/fuse/table_do_read.rs index b856641d68a2c..3c7591cedf4bb 100644 --- a/query/src/datasources/table/fuse/table_do_read.rs +++ b/query/src/datasources/table/fuse/table_do_read.rs @@ -19,7 +19,6 @@ use common_context::IOContext; use common_context::TableIOContext; use common_exception::Result; use common_planners::Extras; -use common_streams::ProgressStream; use common_streams::SendableDataBlockStream; use futures::StreamExt; @@ -72,9 +71,6 @@ impl FuseTable { let stream = stream.then(move |part| { io::do_read(part, da.clone(), projection.clone(), arrow_schema.clone()) }); - - let progress_callback = ctx.progress_callback()?; - let stream = ProgressStream::try_create(Box::pin(stream), progress_callback)?; Ok(Box::pin(stream)) } } diff --git a/query/src/datasources/table/memory/memory_table_stream.rs b/query/src/datasources/table/memory/memory_table_stream.rs index 185f7c6fda977..5a1f8e23c063d 100644 --- a/query/src/datasources/table/memory/memory_table_stream.rs +++ b/query/src/datasources/table/memory/memory_table_stream.rs @@ -18,7 +18,6 @@ use std::usize; use common_datablocks::DataBlock; use common_exception::Result; -use common_streams::ProgressStream; use futures::stream::Stream; use crate::sessions::DatabendQueryContextRef; @@ -37,17 +36,13 @@ pub struct MemoryTableStream { } impl MemoryTableStream { - pub fn try_create( - ctx: DatabendQueryContextRef, - blocks: Vec, - ) -> Result { - let stream = Box::pin(MemoryTableStream { + pub fn try_create(ctx: DatabendQueryContextRef, blocks: Vec) -> Result { + Ok(Self { ctx: ctx.clone(), block_index: 0, block_ranges: vec![], blocks, - }); - ProgressStream::try_create(stream, ctx.progress_callback()?) + }) } fn try_get_one_block(&mut self) -> Result> { diff --git a/query/src/datasources/table_func/numbers_stream.rs b/query/src/datasources/table_func/numbers_stream.rs index 66972595f15ea..a727eb816e77e 100644 --- a/query/src/datasources/table_func/numbers_stream.rs +++ b/query/src/datasources/table_func/numbers_stream.rs @@ -19,7 +19,6 @@ use std::usize; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; -use common_streams::ProgressStream; use futures::stream::Stream; use crate::sessions::DatabendQueryContextRef; @@ -38,17 +37,13 @@ pub struct NumbersStream { } impl NumbersStream { - pub fn try_create( - ctx: DatabendQueryContextRef, - schema: DataSchemaRef, - ) -> Result { - let stream = Box::pin(NumbersStream { + pub fn try_create(ctx: DatabendQueryContextRef, schema: DataSchemaRef) -> Result { + Ok(Self { ctx: ctx.clone(), schema, block_index: 0, blocks: vec![], - }); - ProgressStream::try_create(stream, ctx.progress_callback()?) + }) } #[inline] diff --git a/query/src/pipelines/transforms/transform_source.rs b/query/src/pipelines/transforms/transform_source.rs index 6a21c676e7e39..7c9a9d373df1c 100644 --- a/query/src/pipelines/transforms/transform_source.rs +++ b/query/src/pipelines/transforms/transform_source.rs @@ -19,6 +19,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_planners::ReadDataSourcePlan; use common_streams::CorrectWithSchemaStream; +use common_streams::ProgressStream; use common_streams::SendableDataBlockStream; use common_tracing::tracing; @@ -55,8 +56,11 @@ impl SourceTransform { // get_cluster_table_io_context()? let io_ctx = Arc::new(self.ctx.get_cluster_table_io_context()?); let table_stream = table.read(io_ctx, &self.source_plan.push_downs); + let progress_stream = + ProgressStream::try_create(table_stream.await?, self.ctx.progress_callback()?)?; + Ok(Box::pin( - self.ctx.try_create_abortable(table_stream.await?)?, + self.ctx.try_create_abortable(Box::pin(progress_stream))?, )) } } From d3f9b479b74ddcb17e6efb99e01073cebffccf82 Mon Sep 17 00:00:00 2001 From: sundy-li <543950155@qq.com> Date: Wed, 20 Oct 2021 23:07:02 +0800 Subject: [PATCH 2/2] Fix Lint --- query/src/datasources/table/fuse/table_do_read.rs | 1 - query/src/datasources/table/memory/memory_table_stream.rs | 2 +- query/src/datasources/table_func/numbers_stream.rs | 2 +- 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/query/src/datasources/table/fuse/table_do_read.rs b/query/src/datasources/table/fuse/table_do_read.rs index 3c7591cedf4bb..573813a00d176 100644 --- a/query/src/datasources/table/fuse/table_do_read.rs +++ b/query/src/datasources/table/fuse/table_do_read.rs @@ -56,7 +56,6 @@ impl FuseTable { // TODO we need a configuration to specify the unit of dequeue operation let bite_size = 1; let iter = { - let ctx = ctx.clone(); std::iter::from_fn(move || match ctx.clone().try_get_partitions(bite_size) { Err(_) => None, Ok(parts) if parts.is_empty() => None, diff --git a/query/src/datasources/table/memory/memory_table_stream.rs b/query/src/datasources/table/memory/memory_table_stream.rs index 5a1f8e23c063d..0fd89f2d8b504 100644 --- a/query/src/datasources/table/memory/memory_table_stream.rs +++ b/query/src/datasources/table/memory/memory_table_stream.rs @@ -38,7 +38,7 @@ pub struct MemoryTableStream { impl MemoryTableStream { pub fn try_create(ctx: DatabendQueryContextRef, blocks: Vec) -> Result { Ok(Self { - ctx: ctx.clone(), + ctx, block_index: 0, block_ranges: vec![], blocks, diff --git a/query/src/datasources/table_func/numbers_stream.rs b/query/src/datasources/table_func/numbers_stream.rs index a727eb816e77e..d6e0973effd9d 100644 --- a/query/src/datasources/table_func/numbers_stream.rs +++ b/query/src/datasources/table_func/numbers_stream.rs @@ -39,7 +39,7 @@ pub struct NumbersStream { impl NumbersStream { pub fn try_create(ctx: DatabendQueryContextRef, schema: DataSchemaRef) -> Result { Ok(Self { - ctx: ctx.clone(), + ctx, schema, block_index: 0, blocks: vec![],