Skip to content

Commit

Permalink
Merge pull request #1 from rdettai/refacto-providers-store
Browse files Browse the repository at this point in the history
Refacto providers store
  • Loading branch information
rdettai authored Oct 6, 2021
2 parents 4d92231 + 8e8fd98 commit 06155d1
Show file tree
Hide file tree
Showing 13 changed files with 974 additions and 419 deletions.
6 changes: 3 additions & 3 deletions datafusion/src/avro_to_arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ use crate::arrow::datatypes::Schema;
use crate::error::Result;
#[cfg(feature = "avro")]
pub use reader::{Reader, ReaderBuilder};
use std::io::{Read, Seek};
use std::io::Read;

#[cfg(feature = "avro")]
/// Read Avro schema given a reader
pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
pub fn read_avro_schema_from_reader<R: Read>(reader: &mut R) -> Result<Schema> {
let avro_reader = avro_rs::Reader::new(reader)?;
let schema = avro_reader.writer_schema();
schema::to_arrow_schema(schema)
}

#[cfg(not(feature = "avro"))]
/// Read Avro schema given a reader (requires the avro feature)
pub fn read_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
pub fn read_avro_schema_from_reader<R: Read>(_: &mut R) -> Result<Schema> {
Err(crate::error::DataFusionError::NotImplemented(
"cannot read avro schema without the 'avro' feature enabled".to_string(),
))
Expand Down
111 changes: 69 additions & 42 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,48 @@ use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
use std::fs::File;

use super::PartitionedFile;
use super::{FileFormat, StringStream};
use super::{FileFormat, PhysicalPlanConfig};
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::AvroExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// Line-delimited Avro `FileFormat` implementation.
pub struct AvroFormat {}
/// Avro `FileFormat` implementation.
pub struct AvroFormat;

#[async_trait]
impl FileFormat for AvroFormat {
async fn infer_schema(&self, mut paths: StringStream) -> Result<SchemaRef> {
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(filename) = paths.next().await {
let mut file = File::open(filename)?;
let schema = read_avro_schema_from_reader(&mut file)?;
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _path: &str) -> Result<Statistics> {
async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
schema: SchemaRef,
files: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
limit: Option<usize>,
conf: PhysicalPlanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(
conf.object_store,
// flattening this for now because CsvExec does not support partitioning yet
files.into_iter().flatten().map(|f| f.path).collect(),
statistics,
schema,
projection.clone(),
batch_size,
limit,
conf.files.into_iter().flatten().collect(),
conf.statistics,
conf.schema,
conf.projection,
conf.batch_size,
conf.limit,
);
Ok(Arc::new(exec))
}
Expand All @@ -80,8 +73,16 @@ impl FileFormat for AvroFormat {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::datasource::file_format::string_stream;
use crate::physical_plan::collect;
use crate::{
datasource::{
file_format::PartitionedFile,
object_store::local::{
local_file_meta, local_object_reader, local_object_reader_stream,
LocalFileSystem,
},
},
physical_plan::collect,
};

use super::*;
use arrow::array::{
Expand All @@ -93,10 +94,10 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 2).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 2, None).await?;
let stream = exec.execute(0).await?;

let _ = stream
let tt_batches = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
Expand All @@ -105,13 +106,27 @@ mod tests {
.fold(0, |acc, _| async move { acc + 1i32 })
.await;

assert_eq!(tt_batches, 4 /* 8/2 */);

Ok(())
}

#[tokio::test]
async fn read_limit() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, Some(1)).await?;
let batches = collect(exec).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn read_alltypes_plain_avro() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let x: Vec<String> = exec
.schema()
Expand Down Expand Up @@ -161,7 +176,7 @@ mod tests {
#[tokio::test]
async fn read_bool_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![1]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -189,7 +204,7 @@ mod tests {
#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![0]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -214,7 +229,7 @@ mod tests {
#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![10]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -239,7 +254,7 @@ mod tests {
#[tokio::test]
async fn read_f32_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![6]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -267,7 +282,7 @@ mod tests {
#[tokio::test]
async fn read_f64_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![7]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -295,7 +310,7 @@ mod tests {
#[tokio::test]
async fn read_binary_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![9]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -324,21 +339,33 @@ mod tests {
file_name: &str,
projection: &Option<Vec<usize>>,
batch_size: usize,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, file_name);
let format = AvroFormat {};
let schema = format
.infer_schema(string_stream(vec![filename.clone()]))
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let stats = format
.infer_stats(&filename)
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.await
.expect("Stats inference");
let files = vec![vec![PartitionedFile { path: filename }]];
let files = vec![vec![PartitionedFile {
file_meta: local_file_meta(filename.to_owned()),
}]];
let exec = format
.create_physical_plan(schema, files, stats, projection, batch_size, &[], None)
.create_physical_plan(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
schema,
files,
statistics,
projection: projection.clone(),
batch_size,
filters: vec![],
limit,
})
.await?;
Ok(exec)
}
Expand All @@ -349,15 +376,15 @@ mod tests {
mod tests {
use super::*;

use crate::datasource::file_format::string_stream;
use crate::datasource::object_store::local::local_object_reader_stream;
use crate::error::DataFusionError;

#[tokio::test]
async fn test() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let schema_result = AvroFormat {}
.infer_schema(string_stream(vec![filename]))
.infer_schema(local_object_reader_stream(vec![filename]))
.await;
assert!(matches!(
schema_result,
Expand Down
Loading

0 comments on commit 06155d1

Please sign in to comment.