Skip to content

Commit

Permalink
rm buffer in block_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Feb 23, 2022
1 parent 68e399f commit 3f81e17
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 10 deletions.
13 changes: 5 additions & 8 deletions query/src/storages/fuse/io/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_tracing::tracing;
use common_tracing::tracing::debug_span;
use common_tracing::tracing::Instrument;
use futures::future::BoxFuture;
use futures::io::BufReader;
use opendal::readers::SeekableReader;
use opendal::Operator;

Expand All @@ -39,7 +38,6 @@ pub struct BlockReader {
arrow_table_schema: ArrowSchema,
projection: Vec<usize>,
file_len: u64,
read_buffer_size: u64,
metadata_reader: BlockMetaReader,
}

Expand All @@ -50,7 +48,6 @@ impl BlockReader {
table_schema: DataSchemaRef,
projection: Vec<usize>,
file_len: u64,
read_buffer_size: u64,
reader: BlockMetaReader,
) -> Self {
let block_schema = Arc::new(table_schema.project(projection.clone()));
Expand All @@ -62,7 +59,6 @@ impl BlockReader {
arrow_table_schema,
projection,
file_len,
read_buffer_size,
metadata_reader: reader,
}
}
Expand All @@ -85,7 +81,6 @@ impl BlockReader {

let arrow_fields = &self.arrow_table_schema.fields;
let stream_len = self.file_len;
let read_buffer_size = self.read_buffer_size;
let parquet_fields = metadata.schema().fields();

// read_columns_many_async use field name to filter columns
Expand All @@ -106,9 +101,11 @@ impl BlockReader {
let data_accessor = self.data_accessor.clone();
let path = self.path.clone();
Box::pin(async move {
let reader = SeekableReader::new(data_accessor, path.as_str(), stream_len);
let reader = BufReader::with_capacity(read_buffer_size as usize, reader);
Ok(reader)
Ok(SeekableReader::new(
data_accessor,
path.as_str(),
stream_len,
))
}) as BoxFuture<_>
};

Expand Down
2 changes: 0 additions & 2 deletions query/src/storages/fuse/operations/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl FuseTable {

let part_stream = futures::stream::iter(iter);

let read_buffer_size = ctx.get_settings().get_storage_read_buffer_size()?;
let stream = part_stream
.map(move |part| {
let da = operator.clone();
Expand All @@ -81,7 +80,6 @@ impl FuseTable {
table_schema,
projection,
part_len,
read_buffer_size,
reader,
);
block_reader.read().await.map_err(|e| {
Expand Down

0 comments on commit 3f81e17

Please sign in to comment.