Skip to content

Commit

Permalink
refactor: rename infer_avro_schema_from_reader to read_avro_schema_fr…
Browse files Browse the repository at this point in the history
…om_reader
  • Loading branch information
Igosuki committed Sep 2, 2021
1 parent 01b055f commit e05ccfc
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 22 deletions.
2 changes: 1 addition & 1 deletion datafusion/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ mod test {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, name);
let builder = ReaderBuilder::new()
.infer_schema()
.read_schema()
.with_batch_size(batch_size);
builder.build(File::open(filename).unwrap()).unwrap()
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/avro_to_arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@ pub use reader::{Reader, ReaderBuilder};
use std::io::{Read, Seek};

#[cfg(feature = "avro")]
/// Infer Avro schema given a reader
pub fn infer_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
/// Read Avro schema given a reader
pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
let avro_reader = avro_rs::Reader::new(reader)?;
let schema = avro_reader.writer_schema();
schema::to_arrow_schema(schema)
}

#[cfg(not(feature = "avro"))]
/// Infer Avro schema given a reader (requires the avro feature)
pub fn infer_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
/// Read Avro schema given a reader (requires the avro feature)
pub fn read_avro_schema_from_reader<R: Read + Seek>(_: &mut R) -> Result<Schema> {
Err(crate::error::DataFusionError::NotImplemented(
"cannot infer avro schema without the 'avro' feature enabled".to_string(),
"cannot read avro schema without the 'avro' feature enabled".to_string(),
))
}
11 changes: 5 additions & 6 deletions datafusion/src/avro_to_arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ use std::sync::Arc;
pub struct ReaderBuilder {
/// Optional schema for the Avro file
///
/// If the schema is not supplied, the reader will try to infer the schema
/// based on the Avro structure.
/// If the schema is not supplied, the reader will try to read the schema.
schema: Option<SchemaRef>,
/// Batch size (number of records to load each time)
///
Expand Down Expand Up @@ -65,7 +64,7 @@ impl ReaderBuilder {
/// let file = File::open("test/data/basic.avro").unwrap();
///
/// // create a builder, inferring the schema with the first 100 records
/// let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().infer_schema().with_batch_size(100);
/// let builder = crate::datafusion::avro_to_arrow::ReaderBuilder::new().read_schema().with_batch_size(100);
///
/// let reader = builder.build::<File>(file).unwrap();
///
Expand All @@ -83,7 +82,7 @@ impl ReaderBuilder {
}

/// Set the Avro reader to infer the schema of the file
pub fn infer_schema(mut self) -> Self {
pub fn read_schema(mut self) -> Self {
// remove any schema that is set
self.schema = None;
self
Expand Down Expand Up @@ -111,7 +110,7 @@ impl ReaderBuilder {
// check if schema should be inferred
let schema = match self.schema {
Some(schema) => schema,
None => Arc::new(super::infer_avro_schema_from_reader(&mut source)?),
None => Arc::new(super::read_avro_schema_from_reader(&mut source)?),
};
source.seek(SeekFrom::Start(0))?;
Reader::try_new(source, schema, self.batch_size, self.projection)
Expand Down Expand Up @@ -180,7 +179,7 @@ mod tests {
fn build_reader(name: &str) -> Reader<File> {
let testdata = crate::test_util::arrow_test_data();
let filename = format!("{}/avro/{}", testdata, name);
let builder = ReaderBuilder::new().infer_schema().with_batch_size(64);
let builder = ReaderBuilder::new().read_schema().with_batch_size(64);
builder.build(File::open(filename).unwrap()).unwrap()
}

Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/datasource/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ pub struct AvroFile {
}

impl AvroFile {
/// Attempt to initialize a `AvroFile` from a path. The schema can be inferred automatically.
/// Attempt to initialize a `AvroFile` from a path. The schema can be read automatically.
pub fn try_new(path: &str, options: AvroReadOptions) -> Result<Self> {
let schema = if let Some(schema) = options.schema {
schema
} else {
let filenames =
common::build_checked_file_list(path, options.file_extension)?;
Arc::new(AvroExec::try_infer_schema(&filenames)?)
Arc::new(AvroExec::try_read_schema(&filenames)?)
};

Ok(Self {
Expand Down Expand Up @@ -89,16 +89,16 @@ impl AvroFile {
})
}

/// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be inferred automatically.
pub fn try_new_from_reader_infer_schema<R: Read + Seek + Send + Sync + 'static>(
/// Attempt to initialize an AvroFile from a reader impls Seek. The schema can be read automatically.
pub fn try_new_from_reader_schema<R: Read + Seek + Send + Sync + 'static>(
mut reader: R,
options: AvroReadOptions,
) -> Result<Self> {
let schema = {
if let Some(schema) = options.schema {
schema
} else {
Arc::new(crate::avro_to_arrow::infer_avro_schema_from_reader(
Arc::new(crate::avro_to_arrow::read_avro_schema_from_reader(
&mut reader,
)?)
}
Expand Down
10 changes: 5 additions & 5 deletions datafusion/src/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#[cfg(feature = "avro")]
use super::RecordBatchStream;
use super::{common, source::Source, ExecutionPlan, Partitioning};
use crate::avro_to_arrow::infer_avro_schema_from_reader;
use crate::avro_to_arrow::read_avro_schema_from_reader;
use crate::error::{DataFusionError, Result};
use crate::physical_plan::DisplayFormatType;
use arrow::datatypes::{Schema, SchemaRef};
Expand Down Expand Up @@ -98,7 +98,7 @@ impl AvroExec {

let schema = match options.schema {
Some(s) => s,
None => Arc::new(AvroExec::try_infer_schema(filenames.as_slice())?),
None => Arc::new(AvroExec::try_read_schema(filenames.as_slice())?),
};

let projected_schema = match &projection {
Expand Down Expand Up @@ -192,12 +192,12 @@ impl AvroExec {
self.limit
}

/// Infer schema for given Avro dataset
pub fn try_infer_schema(filenames: &[String]) -> Result<Schema> {
/// Read schema for given Avro dataset
pub fn try_read_schema(filenames: &[String]) -> Result<Schema> {
let mut schemas = Vec::new();
for filename in filenames {
let mut file = File::open(filename)?;
let schema = infer_avro_schema_from_reader(&mut file)?;
let schema = read_avro_schema_from_reader(&mut file)?;
schemas.push(schema);
}

Expand Down

0 comments on commit e05ccfc

Please sign in to comment.