diff --git a/query/src/storages/fuse/io/block_reader.rs b/query/src/storages/fuse/io/block_reader.rs index 4aea7841df49e..baf265853fd6f 100644 --- a/query/src/storages/fuse/io/block_reader.rs +++ b/query/src/storages/fuse/io/block_reader.rs @@ -18,6 +18,7 @@ use common_arrow::arrow::datatypes::Field; use common_arrow::arrow::datatypes::Schema as ArrowSchema; use common_arrow::arrow::io::parquet::read::read_columns_many_async; use common_arrow::arrow::io::parquet::read::RowGroupDeserializer; +use common_base::tokio::sync::Semaphore; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -28,6 +29,7 @@ use common_tracing::tracing::Instrument; use futures::future::BoxFuture; use opendal::Operator; +use super::parallel_async_reader::ParallelAsyncReader; use crate::storages::fuse::io::meta_readers::BlockMetaReader; pub struct BlockReader { @@ -96,14 +98,21 @@ impl BlockReader { }) .collect(); + // todo(youngsofun): make this a config + let semaphore = Arc::new(Semaphore::new(2)); let factory = || { let data_accessor = self.data_accessor.clone(); let path = self.path.clone(); + let semaphore = semaphore.clone(); Box::pin(async move { - Ok(data_accessor - .object(path.as_str()) - .reader() - .total_size(stream_len)) + let permit = semaphore.clone().acquire_owned().await.unwrap(); + Ok(ParallelAsyncReader::new( + permit, + data_accessor + .object(path.as_str()) + .reader() + .total_size(stream_len), + )) }) as BoxFuture<_> }; diff --git a/query/src/storages/fuse/io/mod.rs b/query/src/storages/fuse/io/mod.rs index 9c931a7180a32..630bbad618a28 100644 --- a/query/src/storages/fuse/io/mod.rs +++ b/query/src/storages/fuse/io/mod.rs @@ -17,6 +17,7 @@ mod block_stream_writer; mod block_writer; mod locations; mod meta_readers; +mod parallel_async_reader; pub use block_reader::BlockReader; pub use block_stream_writer::BlockRegulator; diff --git a/query/src/storages/fuse/io/parallel_async_reader.rs b/query/src/storages/fuse/io/parallel_async_reader.rs new file mode 100644 index 0000000000000..ebe78e62ff04d --- /dev/null +++ b/query/src/storages/fuse/io/parallel_async_reader.rs @@ -0,0 +1,81 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::IoSliceMut; +use std::io::SeekFrom; +use std::ops::Deref; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +use common_base::tokio::sync::OwnedSemaphorePermit; +use futures::AsyncRead; +use futures::AsyncSeek; + +pub struct ParallelAsyncReader { + _permit: OwnedSemaphorePermit, + value: T, +} + +impl ParallelAsyncReader { + pub fn new(permit: OwnedSemaphorePermit, value: T) -> Self { + ParallelAsyncReader { + _permit: permit, + value, + } + } +} + +impl Deref for ParallelAsyncReader { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +impl DerefMut for ParallelAsyncReader { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.value + } +} + +impl AsyncRead for ParallelAsyncReader { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut **self).poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut **self).poll_read_vectored(cx, bufs) + } +} + +impl AsyncSeek for ParallelAsyncReader { + fn poll_seek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + pos: SeekFrom, + ) -> Poll> { + Pin::new(&mut **self).poll_seek(cx, pos) + } +}