Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refacto providers store #1

Merged
merged 17 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datafusion/src/avro_to_arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ use crate::arrow::datatypes::Schema;
use crate::error::Result;
#[cfg(feature = "avro")]
pub use reader::{Reader, ReaderBuilder};
use std::io::{Read, Seek};
use std::io::Read;

#[cfg(feature = "avro")]
/// Read Avro schema given a reader
pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
pub fn read_avro_schema_from_reader<R: Read>(reader: &mut R) -> Result<Schema> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice cleanup (and makes this code easier to use)

let avro_reader = avro_rs::Reader::new(reader)?;
let schema = avro_reader.writer_schema();
schema::to_arrow_schema(schema)
}

#[cfg(not(feature = "avro"))]
/// Read Avro schema given a reader (requires the avro feature)
pub fn read_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
pub fn read_avro_schema_from_reader<R: Read>(_: &mut R) -> Result<Schema> {
Err(crate::error::DataFusionError::NotImplemented(
"cannot read avro schema without the 'avro' feature enabled".to_string(),
))
Expand Down
111 changes: 69 additions & 42 deletions datafusion/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,55 +23,48 @@ use arrow::datatypes::Schema;
use arrow::{self, datatypes::SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
use std::fs::File;

use super::PartitionedFile;
use super::{FileFormat, StringStream};
use super::{FileFormat, PhysicalPlanConfig};
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::datasource::object_store::{ObjectReader, ObjectReaderStream};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::file_format::AvroExec;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::Statistics;

/// Line-delimited Avro `FileFormat` implementation.
pub struct AvroFormat {}
/// Avro `FileFormat` implementation.
pub struct AvroFormat;

#[async_trait]
impl FileFormat for AvroFormat {
async fn infer_schema(&self, mut paths: StringStream) -> Result<SchemaRef> {
async fn infer_schema(&self, mut readers: ObjectReaderStream) -> Result<SchemaRef> {
let mut schemas = vec![];
while let Some(filename) = paths.next().await {
let mut file = File::open(filename)?;
let schema = read_avro_schema_from_reader(&mut file)?;
while let Some(obj_reader) = readers.next().await {
let mut reader = obj_reader?.sync_reader()?;
let schema = read_avro_schema_from_reader(&mut reader)?;
schemas.push(schema);
}
let merged_schema = Schema::try_merge(schemas)?;
Ok(Arc::new(merged_schema))
}

async fn infer_stats(&self, _path: &str) -> Result<Statistics> {
async fn infer_stats(&self, _reader: Arc<dyn ObjectReader>) -> Result<Statistics> {
Ok(Statistics::default())
}

async fn create_physical_plan(
&self,
schema: SchemaRef,
files: Vec<Vec<PartitionedFile>>,
statistics: Statistics,
projection: &Option<Vec<usize>>,
batch_size: usize,
_filters: &[Expr],
limit: Option<usize>,
conf: PhysicalPlanConfig,
) -> Result<Arc<dyn ExecutionPlan>> {
let exec = AvroExec::new(
conf.object_store,
// flattening this for now because CsvExec does not support partitioning yet
files.into_iter().flatten().map(|f| f.path).collect(),
statistics,
schema,
projection.clone(),
batch_size,
limit,
conf.files.into_iter().flatten().collect(),
conf.statistics,
conf.schema,
conf.projection,
conf.batch_size,
conf.limit,
);
Ok(Arc::new(exec))
}
Expand All @@ -80,8 +73,16 @@ impl FileFormat for AvroFormat {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::datasource::file_format::string_stream;
use crate::physical_plan::collect;
use crate::{
datasource::{
file_format::PartitionedFile,
object_store::local::{
local_file_meta, local_object_reader, local_object_reader_stream,
LocalFileSystem,
},
},
physical_plan::collect,
};

use super::*;
use arrow::array::{
Expand All @@ -93,10 +94,10 @@ mod tests {
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 2).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 2, None).await?;
let stream = exec.execute(0).await?;

let _ = stream
let tt_batches = stream
.map(|batch| {
let batch = batch.unwrap();
assert_eq!(11, batch.num_columns());
Expand All @@ -105,13 +106,27 @@ mod tests {
.fold(0, |acc, _| async move { acc + 1i32 })
.await;

assert_eq!(tt_batches, 4 /* 8/2 */);

Ok(())
}

#[tokio::test]
async fn read_limit() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, Some(1)).await?;
let batches = collect(exec).await?;
assert_eq!(1, batches.len());
assert_eq!(11, batches[0].num_columns());
assert_eq!(1, batches[0].num_rows());

Ok(())
}

#[tokio::test]
async fn read_alltypes_plain_avro() -> Result<()> {
let projection = None;
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let x: Vec<String> = exec
.schema()
Expand Down Expand Up @@ -161,7 +176,7 @@ mod tests {
#[tokio::test]
async fn read_bool_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![1]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -189,7 +204,7 @@ mod tests {
#[tokio::test]
async fn read_i32_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![0]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -214,7 +229,7 @@ mod tests {
#[tokio::test]
async fn read_i96_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![10]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand All @@ -239,7 +254,7 @@ mod tests {
#[tokio::test]
async fn read_f32_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![6]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -267,7 +282,7 @@ mod tests {
#[tokio::test]
async fn read_f64_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![7]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -295,7 +310,7 @@ mod tests {
#[tokio::test]
async fn read_binary_alltypes_plain_avro() -> Result<()> {
let projection = Some(vec![9]);
let exec = get_exec("alltypes_plain.avro", &projection, 1024).await?;
let exec = get_exec("alltypes_plain.avro", &projection, 1024, None).await?;

let batches = collect(exec).await?;
assert_eq!(batches.len(), 1);
Expand Down Expand Up @@ -324,21 +339,33 @@ mod tests {
file_name: &str,
projection: &Option<Vec<usize>>,
batch_size: usize,
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, file_name);
let format = AvroFormat {};
let schema = format
.infer_schema(string_stream(vec![filename.clone()]))
.infer_schema(local_object_reader_stream(vec![filename.clone()]))
.await
.expect("Schema inference");
let stats = format
.infer_stats(&filename)
let statistics = format
.infer_stats(local_object_reader(filename.clone()))
.await
.expect("Stats inference");
let files = vec![vec![PartitionedFile { path: filename }]];
let files = vec![vec![PartitionedFile {
file_meta: local_file_meta(filename.to_owned()),
}]];
let exec = format
.create_physical_plan(schema, files, stats, projection, batch_size, &[], None)
.create_physical_plan(PhysicalPlanConfig {
object_store: Arc::new(LocalFileSystem {}),
schema,
files,
statistics,
projection: projection.clone(),
batch_size,
filters: vec![],
limit,
})
.await?;
Ok(exec)
}
Expand All @@ -349,15 +376,15 @@ mod tests {
mod tests {
use super::*;

use crate::datasource::file_format::string_stream;
use crate::datasource::object_store::local::local_object_reader_stream;
use crate::error::DataFusionError;

#[tokio::test]
async fn test() -> Result<()> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
let schema_result = AvroFormat {}
.infer_schema(string_stream(vec![filename]))
.infer_schema(local_object_reader_stream(vec![filename]))
.await;
assert!(matches!(
schema_result,
Expand Down
Loading