From 042a33bf09d88839fe5eea9a839c42e4eeddf01c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 4 Oct 2021 09:55:17 +0200 Subject: [PATCH] [refacto] keep async reader stub --- datafusion/src/datasource/file_format/avro.rs | 2 +- datafusion/src/datasource/file_format/csv.rs | 2 +- datafusion/src/datasource/file_format/json.rs | 2 +- datafusion/src/datasource/file_format/parquet.rs | 2 +- datafusion/src/datasource/object_store/local.rs | 14 ++++++++++++-- datafusion/src/datasource/object_store/mod.rs | 12 ++++++++---- datafusion/src/physical_plan/file_format/avro.rs | 2 +- datafusion/src/physical_plan/file_format/csv.rs | 2 +- datafusion/src/physical_plan/file_format/json.rs | 2 +- 9 files changed, 27 insertions(+), 13 deletions(-) diff --git a/datafusion/src/datasource/file_format/avro.rs b/datafusion/src/datasource/file_format/avro.rs index 0e6008799271e..ac83d63296ae1 100644 --- a/datafusion/src/datasource/file_format/avro.rs +++ b/datafusion/src/datasource/file_format/avro.rs @@ -67,7 +67,7 @@ impl FileFormat for AvroFormat { .object_store_registry .get_by_uri(&fmeta.path)? .file_reader(fmeta)? - .reader()?; + .sync_reader()?; let schema = read_avro_schema_from_reader(&mut reader)?; schemas.push(schema); } diff --git a/datafusion/src/datasource/file_format/csv.rs b/datafusion/src/datasource/file_format/csv.rs index 9f8b27cb9114d..302afb4c0d9c3 100644 --- a/datafusion/src/datasource/file_format/csv.rs +++ b/datafusion/src/datasource/file_format/csv.rs @@ -95,7 +95,7 @@ impl FileFormat for CsvFormat { .object_store_registry .get_by_uri(&fmeta.path)? .file_reader(fmeta)? - .reader()?; + .sync_reader()?; let (schema, records_read) = arrow::csv::reader::infer_reader_schema( &mut reader, self.delimiter, diff --git a/datafusion/src/datasource/file_format/json.rs b/datafusion/src/datasource/file_format/json.rs index 20441b2d62c26..ffa6216cf7397 100644 --- a/datafusion/src/datasource/file_format/json.rs +++ b/datafusion/src/datasource/file_format/json.rs @@ -81,7 +81,7 @@ impl FileFormat for JsonFormat { .object_store_registry .get_by_uri(&fmeta.path)? .file_reader(fmeta)? - .reader()?; + .sync_reader()?; let mut reader = BufReader::new(reader); let iter = ValueIter::new(&mut reader, None); let schema = infer_json_schema_from_iterator(iter.take_while(|_| { diff --git a/datafusion/src/datasource/file_format/parquet.rs b/datafusion/src/datasource/file_format/parquet.rs index 99a8a890914f2..6e69651fc407f 100644 --- a/datafusion/src/datasource/file_format/parquet.rs +++ b/datafusion/src/datasource/file_format/parquet.rs @@ -337,7 +337,7 @@ impl ChunkReader for ChunkObjectReader { fn get_read(&self, start: u64, length: usize) -> ParquetResult { self.0 - .chunk_reader(start, length) + .sync_chunk_reader(start, length) .map_err(|e| ParquetError::ArrowError(e.to_string())) } } diff --git a/datafusion/src/datasource/object_store/local.rs b/datafusion/src/datasource/object_store/local.rs index 8fe46fad37aed..655e206274155 100644 --- a/datafusion/src/datasource/object_store/local.rs +++ b/datafusion/src/datasource/object_store/local.rs @@ -22,7 +22,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use async_trait::async_trait; -use futures::{stream, StreamExt}; +use futures::{stream, AsyncRead, StreamExt}; use crate::datasource::object_store::{ ListEntryStream, ObjectReader, ObjectStore, SizedFile, SizedFileStream, @@ -65,7 +65,17 @@ impl LocalFileReader { #[async_trait] impl ObjectReader for LocalFileReader { - fn chunk_reader( + async fn chunk_reader( + &self, + _start: u64, + _length: usize, + ) -> Result> { + todo!( + "implement once async file readers are available (arrow-rs#78, arrow-rs#111)" + ) + } + + fn sync_chunk_reader( &self, start: u64, length: usize, diff --git a/datafusion/src/datasource/object_store/mod.rs b/datafusion/src/datasource/object_store/mod.rs index b03c795024b22..3474f691f4814 100644 --- a/datafusion/src/datasource/object_store/mod.rs +++ b/datafusion/src/datasource/object_store/mod.rs @@ -26,7 +26,7 @@ use std::pin::Pin; use std::sync::{Arc, RwLock}; use async_trait::async_trait; -use futures::{Stream, StreamExt}; +use futures::{AsyncRead, Stream, StreamExt}; use local::LocalFileSystem; @@ -37,16 +37,20 @@ use crate::error::{DataFusionError, Result}; /// have some performance impacts. #[async_trait] pub trait ObjectReader { + /// Get reader for a part [start, start + length] in the file asynchronously + async fn chunk_reader(&self, start: u64, length: usize) + -> Result>; + /// Get reader for a part [start, start + length] in the file - fn chunk_reader( + fn sync_chunk_reader( &self, start: u64, length: usize, ) -> Result>; /// Get reader for the entire file - fn reader(&self) -> Result> { - self.chunk_reader(0, self.length() as usize) + fn sync_reader(&self) -> Result> { + self.sync_chunk_reader(0, self.length() as usize) } /// Get the size of the file diff --git a/datafusion/src/physical_plan/file_format/avro.rs b/datafusion/src/physical_plan/file_format/avro.rs index e20ba3afe7a86..65b5b6e4b4e9f 100644 --- a/datafusion/src/physical_plan/file_format/avro.rs +++ b/datafusion/src/physical_plan/file_format/avro.rs @@ -129,7 +129,7 @@ impl ExecutionPlan for AvroExec { .object_store_registry .get_by_uri(&self.files[partition].file.path)? .file_reader(self.files[partition].file.clone())? - .reader()?; + .sync_reader()?; let proj = self.projection.as_ref().map(|p| { p.iter() diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index d7392579774ca..5aa7e62a112a0 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -196,7 +196,7 @@ impl CsvStream> { let file = object_store_registry .get_by_uri(&file.path)? .file_reader(file.clone())? - .reader()?; + .sync_reader()?; Self::try_new_from_reader( file, schema, has_header, delimiter, projection, batch_size, limit, ) diff --git a/datafusion/src/physical_plan/file_format/json.rs b/datafusion/src/physical_plan/file_format/json.rs index 1d02f5fca3b24..7a81a75fadc5e 100644 --- a/datafusion/src/physical_plan/file_format/json.rs +++ b/datafusion/src/physical_plan/file_format/json.rs @@ -129,7 +129,7 @@ impl ExecutionPlan for NdJsonExec { .object_store_registry .get_by_uri(&self.files[partition].file.path)? .file_reader(self.files[partition].file.clone())? - .reader()?; + .sync_reader()?; let json_reader = json::Reader::new(file, self.schema(), self.batch_size, proj);