Skip to content

Commit

Permalink
Merge pull request #4230 from youngsofun/parquet2
Browse files Browse the repository at this point in the history
remove buffer in block_reader of fuse store
  • Loading branch information
mergify[bot] authored Feb 24, 2022
2 parents 3c54a84 + 7373fc5 commit f9971bd
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 12 deletions.
1 change: 0 additions & 1 deletion common/streams/src/stream_limit_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl LimitByStream {
let array = BooleanArray::from_data(ArrowType::Boolean, filter.into(), None);
let chunk = block.clone().try_into()?;
let chunk = arrow::compute::filter::filter_chunk(&chunk, &array)?;
//let chunk = chunk.into();
Some(DataBlock::from_chunk(block.schema(), &chunk)).transpose()
}
}
Expand Down
11 changes: 2 additions & 9 deletions query/src/storages/fuse/io/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use common_tracing::tracing;
use common_tracing::tracing::debug_span;
use common_tracing::tracing::Instrument;
use futures::future::BoxFuture;
use futures::io::BufReader;
use opendal::Operator;

use crate::storages::fuse::io::meta_readers::BlockMetaReader;
Expand All @@ -38,7 +37,6 @@ pub struct BlockReader {
arrow_table_schema: ArrowSchema,
projection: Vec<usize>,
file_len: u64,
read_buffer_size: u64,
metadata_reader: BlockMetaReader,
}

Expand All @@ -49,7 +47,6 @@ impl BlockReader {
table_schema: DataSchemaRef,
projection: Vec<usize>,
file_len: u64,
read_buffer_size: u64,
reader: BlockMetaReader,
) -> Self {
let block_schema = Arc::new(table_schema.project(projection.clone()));
Expand All @@ -61,7 +58,6 @@ impl BlockReader {
arrow_table_schema,
projection,
file_len,
read_buffer_size,
metadata_reader: reader,
}
}
Expand All @@ -84,7 +80,6 @@ impl BlockReader {

let arrow_fields = &self.arrow_table_schema.fields;
let stream_len = self.file_len;
let read_buffer_size = self.read_buffer_size;
let parquet_fields = metadata.schema().fields();

// read_columns_many_async use field name to filter columns
Expand All @@ -105,12 +100,10 @@ impl BlockReader {
let data_accessor = self.data_accessor.clone();
let path = self.path.clone();
Box::pin(async move {
let reader = data_accessor
Ok(data_accessor
.object(path.as_str())
.reader()
.total_size(stream_len);
let reader = BufReader::with_capacity(read_buffer_size as usize, reader);
Ok(reader)
.total_size(stream_len))
}) as BoxFuture<_>
};

Expand Down
2 changes: 0 additions & 2 deletions query/src/storages/fuse/operations/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ impl FuseTable {

let part_stream = futures::stream::iter(iter);

let read_buffer_size = ctx.get_settings().get_storage_read_buffer_size()?;
let stream = part_stream
.map(move |part| {
let da = operator.clone();
Expand All @@ -81,7 +80,6 @@ impl FuseTable {
table_schema,
projection,
part_len,
read_buffer_size,
reader,
);
block_reader.read().await.map_err(|e| {
Expand Down

0 comments on commit f9971bd

Please sign in to comment.