Skip to content

Commit

Permalink
[refacto] keep async reader stub
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Oct 4, 2021
1 parent 2ef3233 commit 042a33b
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 13 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl FileFormat for AvroFormat {
.object_store_registry
.get_by_uri(&fmeta.path)?
.file_reader(fmeta)?
.reader()?;
.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl FileFormat for CsvFormat {
.object_store_registry
.get_by_uri(&fmeta.path)?
.file_reader(fmeta)?
.reader()?;
.sync_reader()?;
let (schema, records_read) = arrow::csv::reader::infer_reader_schema(
&mut reader,
self.delimiter,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat {
.object_store_registry
.get_by_uri(&fmeta.path)?
.file_reader(fmeta)?
.reader()?;
.sync_reader()?;
let mut reader = BufReader::new(reader);
let iter = ValueIter::new(&mut reader, None);
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl ChunkReader for ChunkObjectReader {

fn get_read(&self, start: u64, length: usize) -> ParquetResult<Self::T> {
self.0
.chunk_reader(start, length)
.sync_chunk_reader(start, length)
.map_err(|e| ParquetError::ArrowError(e.to_string()))
}
}
Expand Down
14 changes: 12 additions & 2 deletions datafusion/src/datasource/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

use async_trait::async_trait;
use futures::{stream, StreamExt};
use futures::{stream, AsyncRead, StreamExt};

use crate::datasource::object_store::{
ListEntryStream, ObjectReader, ObjectStore, SizedFile, SizedFileStream,
Expand Down Expand Up @@ -65,7 +65,17 @@ impl LocalFileReader {

#[async_trait]
impl ObjectReader for LocalFileReader {
fn chunk_reader(
async fn chunk_reader(
&self,
_start: u64,
_length: usize,
) -> Result<Box<dyn AsyncRead>> {
todo!(
"implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
)
}

fn sync_chunk_reader(
&self,
start: u64,
length: usize,
Expand Down
12 changes: 8 additions & 4 deletions datafusion/src/datasource/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::pin::Pin;
use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use futures::{Stream, StreamExt};
use futures::{AsyncRead, Stream, StreamExt};

use local::LocalFileSystem;

Expand All @@ -37,16 +37,20 @@ use crate::error::{DataFusionError, Result};
/// have some performance impacts.
#[async_trait]
pub trait ObjectReader {
/// Get reader for a part [start, start + length] in the file asynchronously
async fn chunk_reader(&self, start: u64, length: usize)
-> Result<Box<dyn AsyncRead>>;

/// Get reader for a part [start, start + length] in the file
fn chunk_reader(
fn sync_chunk_reader(
&self,
start: u64,
length: usize,
) -> Result<Box<dyn Read + Send + Sync>>;

/// Get reader for the entire file
fn reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.chunk_reader(0, self.length() as usize)
fn sync_reader(&self) -> Result<Box<dyn Read + Send + Sync>> {
self.sync_chunk_reader(0, self.length() as usize)
}

/// Get the size of the file
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ExecutionPlan for AvroExec {
.object_store_registry
.get_by_uri(&self.files[partition].file.path)?
.file_reader(self.files[partition].file.clone())?
.reader()?;
.sync_reader()?;

let proj = self.projection.as_ref().map(|p| {
p.iter()
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl CsvStream<Box<dyn Read + Send + Sync>> {
let file = object_store_registry
.get_by_uri(&file.path)?
.file_reader(file.clone())?
.reader()?;
.sync_reader()?;
Self::try_new_from_reader(
file, schema, has_header, delimiter, projection, batch_size, limit,
)
Expand Down
2 changes: 1 addition & 1 deletion datafusion/src/physical_plan/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl ExecutionPlan for NdJsonExec {
.object_store_registry
.get_by_uri(&self.files[partition].file.path)?
.file_reader(self.files[partition].file.clone())?
.reader()?;
.sync_reader()?;

let json_reader = json::Reader::new(file, self.schema(), self.batch_size, proj);

Expand Down

0 comments on commit 042a33b

Please sign in to comment.