diff --git a/benches/avro_read.rs b/benches/avro_read.rs index 1411ad07d5f..029d3ab7d6b 100644 --- a/benches/avro_read.rs +++ b/benches/avro_read.rs @@ -52,6 +52,7 @@ fn read_batch(buffer: &[u8], size: usize) -> Result<()> { ), avro_schema, schema.fields, + None, ); let mut rows = 0; diff --git a/examples/avro_read.rs b/examples/avro_read.rs index 6d7722ac1d4..2d0148cf9d9 100644 --- a/examples/avro_read.rs +++ b/examples/avro_read.rs @@ -20,6 +20,7 @@ fn main() -> Result<()> { read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), avro_schema, schema.fields, + None, ); for maybe_chunk in reader { diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs index 92e5b420472..6370f6aeca5 100644 --- a/examples/avro_read_async.rs +++ b/examples/avro_read_async.rs @@ -20,6 +20,7 @@ async fn main() -> Result<()> { let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?; let avro_schemas = Arc::new(avro_schemas); + let projection = Arc::new(schema.fields.iter().map(|_| true).collect::>()); let blocks = block_stream(&mut reader, marker).await; @@ -27,11 +28,12 @@ async fn main() -> Result<()> { while let Some(mut block) = blocks.next().await.transpose()? { let schema = schema.clone(); let avro_schemas = avro_schemas.clone(); + let projection = projection.clone(); // the content here is CPU-bounded. It should run on a dedicated thread pool let handle = tokio::task::spawn_blocking(move || { let mut decompressed = Block::new(0, vec![]); decompress_block(&mut block, &mut decompressed, compression)?; - deserialize(&decompressed, &schema.fields, &avro_schemas) + deserialize(&decompressed, &schema.fields, &avro_schemas, &projection) }); let batch = handle.await.unwrap()?; assert!(!batch.is_empty()); diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index cc636cc5b77..37c733fc578 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -137,24 +137,6 @@ fn deserialize_value<'a>( array.try_push_valid()?; } } - DataType::Interval(IntervalUnit::MonthDayNano) => { - // https://avro.apache.org/docs/current/spec.html#Duration - // 12 bytes, months, days, millis in LE - let data = &block[..12]; - block = &block[12..]; - - let value = months_days_ns::new( - i32::from_le_bytes([data[0], data[1], data[2], data[3]]), - i32::from_le_bytes([data[4], data[5], data[6], data[7]]), - i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 * 1_000_000, - ); - - let array = array - .as_mut_any() - .downcast_mut::>() - .unwrap(); - array.push(Some(value)) - } DataType::Struct(inner_fields) => { let fields = match avro_field { AvroSchema::Record(Record { fields, .. }) => fields, @@ -227,6 +209,25 @@ fn deserialize_value<'a>( .unwrap(); array.push(Some(value)) } + PrimitiveType::MonthDayNano => { + // https://avro.apache.org/docs/current/spec.html#Duration + // 12 bytes, months, days, millis in LE + let data = &block[..12]; + block = &block[12..]; + + let value = months_days_ns::new( + i32::from_le_bytes([data[0], data[1], data[2], data[3]]), + i32::from_le_bytes([data[4], data[5], data[6], data[7]]), + i32::from_le_bytes([data[8], data[9], data[10], data[11]]) as i64 + * 1_000_000, + ); + + let array = array + .as_mut_any() + .downcast_mut::>() + .unwrap(); + array.push(Some(value)) + } _ => unreachable!(), }, PhysicalType::Utf8 => { @@ -283,11 +284,109 @@ fn deserialize_value<'a>( Ok(block) } -/// Deserializes a [`Block`] into [`Chunk`]. +fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> Result<&'a [u8]> { + if field.is_nullable { + let variant = util::zigzag_i64(&mut block)?; + let is_null_first = is_union_null_first(avro_field); + if is_null_first && variant == 0 || !is_null_first && variant != 0 { + return Ok(block); + } + } + match &field.data_type { + DataType::List(inner) => { + let avro_inner = match avro_field { + AvroSchema::Array(inner) => inner.as_ref(), + AvroSchema::Union(u) => match &u.as_slice() { + &[AvroSchema::Array(inner), _] | &[_, AvroSchema::Array(inner)] => { + inner.as_ref() + } + _ => unreachable!(), + }, + _ => unreachable!(), + }; + + loop { + let len = util::zigzag_i64(&mut block)? as usize; + + if len == 0 { + break; + } + + for _ in 0..len { + block = skip_item(inner, avro_inner, block)?; + } + } + } + DataType::Struct(inner_fields) => { + let fields = match avro_field { + AvroSchema::Record(Record { fields, .. }) => fields, + AvroSchema::Union(u) => match &u.as_slice() { + &[AvroSchema::Record(Record { fields, .. }), _] + | &[_, AvroSchema::Record(Record { fields, .. })] => fields, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + + for (field, avro_field) in inner_fields.iter().zip(fields.iter()) { + block = skip_item(field, &avro_field.schema, block)?; + } + } + _ => match field.data_type.to_physical_type() { + PhysicalType::Boolean => { + let _ = block[0] == 1; + block = &block[1..]; + } + PhysicalType::Primitive(primitive) => match primitive { + PrimitiveType::Int32 => { + let _ = util::zigzag_i64(&mut block)?; + } + PrimitiveType::Int64 => { + let _ = util::zigzag_i64(&mut block)?; + } + PrimitiveType::Float32 => { + block = &block[std::mem::size_of::()..]; + } + PrimitiveType::Float64 => { + block = &block[std::mem::size_of::()..]; + } + PrimitiveType::MonthDayNano => { + block = &block[12..]; + } + _ => unreachable!(), + }, + PhysicalType::Utf8 | PhysicalType::Binary => { + let len: usize = util::zigzag_i64(&mut block)?.try_into().map_err(|_| { + ArrowError::ExternalFormat( + "Avro format contains a non-usize number of bytes".to_string(), + ) + })?; + block = &block[len..]; + } + PhysicalType::FixedSizeBinary => { + let len = if let DataType::FixedSizeBinary(len) = &field.data_type { + *len + } else { + unreachable!() + }; + + block = &block[len..]; + } + PhysicalType::Dictionary(_) => { + let _ = util::zigzag_i64(&mut block)? as i32; + } + _ => todo!(), + }, + } + Ok(block) +} + +/// Deserializes a [`Block`] into [`Chunk`], projected pub fn deserialize( block: &Block, fields: &[Field], avro_schemas: &[AvroSchema], + projection: &[bool], ) -> Result>> { let rows = block.number_of_rows; let mut block = block.data.as_ref(); @@ -296,21 +395,39 @@ pub fn deserialize( let mut arrays: Vec> = fields .iter() .zip(avro_schemas.iter()) - .map(|(field, avro_schema)| { - let data_type = field.data_type().to_logical_type(); - make_mutable(data_type, Some(avro_schema), rows) + .zip(projection.iter()) + .map(|((field, avro_schema), projection)| { + if *projection { + make_mutable(&field.data_type, Some(avro_schema), rows) + } else { + // just something; we are not going to use it + make_mutable(&DataType::Int32, None, 0) + } }) .collect::>()?; // this is _the_ expensive transpose (rows -> columns) for _ in 0..rows { - for ((array, field), avro_field) in arrays + let iter = arrays .iter_mut() .zip(fields.iter()) .zip(avro_schemas.iter()) - { - block = deserialize_item(array.as_mut(), field.is_nullable, avro_field, block)? + .zip(projection.iter()); + + for (((array, field), avro_field), projection) in iter { + block = if *projection { + deserialize_item(array.as_mut(), field.is_nullable, avro_field, block) + } else { + skip_item(field, avro_field, block) + }? } } - Chunk::try_new(arrays.iter_mut().map(|array| array.as_arc()).collect()) + Chunk::try_new( + arrays + .iter_mut() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .map(|array| array.as_arc()) + .collect(), + ) } diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index b48302d7b27..e437b4ef2da 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -17,7 +17,7 @@ mod schema; mod util; pub(super) use header::deserialize_header; -pub(super) use schema::convert_schema; +pub(super) use schema::infer_schema; use crate::array::Array; use crate::chunk::Chunk; @@ -32,7 +32,7 @@ pub fn read_metadata( reader: &mut R, ) -> Result<(Vec, Schema, Option, [u8; 16])> { let (avro_schema, codec, marker) = util::read_schema(reader)?; - let schema = convert_schema(&avro_schema)?; + let schema = infer_schema(&avro_schema)?; let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() @@ -48,15 +48,23 @@ pub struct Reader { iter: Decompressor, avro_schemas: Vec, fields: Vec, + projection: Vec, } impl Reader { /// Creates a new [`Reader`]. - pub fn new(iter: Decompressor, avro_schemas: Vec, fields: Vec) -> Self { + pub fn new( + iter: Decompressor, + avro_schemas: Vec, + fields: Vec, + projection: Option>, + ) -> Self { + let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect()); Self { iter, avro_schemas, fields, + projection, } } @@ -72,10 +80,11 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let fields = &self.fields[..]; let avro_schemas = &self.avro_schemas; + let projection = &self.projection; self.iter .next() .transpose() - .map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas)) + .map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas, projection)) } } diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 75e65afa3d8..ff72879d78b 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -19,8 +19,9 @@ fn external_props(schema: &AvroSchema) -> Metadata { props } -/// Maps an [`AvroSchema`] into a [`Schema`]. -pub fn convert_schema(schema: &AvroSchema) -> Result { +/// Infers an [`Schema`] from the root [`AvroSchema`]. +/// This +pub fn infer_schema(schema: &AvroSchema) -> Result { if let AvroSchema::Record(Record { fields, .. }) = schema { Ok(fields .iter() @@ -35,7 +36,7 @@ pub fn convert_schema(schema: &AvroSchema) -> Result { .into()) } else { Err(ArrowError::OutOfSpec( - "An avro Schema must be of type Record".to_string(), + "The root AvroSchema must be of type Record".to_string(), )) } } diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs index 1fb3526d91e..3c1b58ac78b 100644 --- a/src/io/avro/read_async/metadata.rs +++ b/src/io/avro/read_async/metadata.rs @@ -8,8 +8,8 @@ use futures::AsyncReadExt; use crate::datatypes::Schema; use crate::error::{ArrowError, Result}; -use super::super::read::convert_schema; use super::super::read::deserialize_header; +use super::super::read::infer_schema; use super::super::Compression; use super::super::{read_header, read_metadata}; use super::utils::zigzag_i64; @@ -28,7 +28,7 @@ pub async fn read_metadata( reader: &mut R, ) -> Result<(Vec, Schema, Option, [u8; 16])> { let (avro_schema, codec, marker) = read_metadata_async(reader).await?; - let schema = convert_schema(&avro_schema)?; + let schema = infer_schema(&avro_schema)?; let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { fields.into_iter().map(|x| x.schema).collect() diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 8ed77f03d38..46897ecceee 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -174,7 +174,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Ok(writer.into_inner().unwrap()) } -pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Chunk>, Schema)> { +pub(super) fn read_avro( + mut avro: &[u8], + projection: Option>, +) -> Result<(Chunk>, Schema)> { let file = &mut avro; let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; @@ -183,8 +186,21 @@ pub(super) fn read_avro(mut avro: &[u8]) -> Result<(Chunk>, Schem read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), avro_schema, schema.fields.clone(), + projection.clone(), ); + let schema = if let Some(projection) = projection { + let fields = schema + .fields + .into_iter() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .collect::>(); + Schema::from(fields) + } else { + schema + }; + reader.next().unwrap().map(|x| (x, schema)) } @@ -193,7 +209,7 @@ fn test(codec: Codec) -> Result<()> { let expected = data(); let (_, expected_schema) = schema(); - let (result, schema) = read_avro(&avro)?; + let (result, schema) = read_avro(&avro, None)?; assert_eq!(schema, expected_schema); assert_eq!(result, expected); @@ -214,3 +230,36 @@ fn read_deflate() -> Result<()> { fn read_snappy() -> Result<()> { test(Codec::Snappy) } + +fn test_projected(projection: Vec) -> Result<()> { + let avro = write_avro(Codec::Null).unwrap(); + let expected = data(); + let expected = expected + .into_arrays() + .into_iter() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .collect(); + let expected = Chunk::new(expected); + let (_, expected_schema) = schema(); + let expected_fields = expected_schema + .fields + .into_iter() + .zip(projection.iter()) + .filter_map(|x| if *x.1 { Some(x.0) } else { None }) + .collect::>(); + let expected_schema = Schema::from(expected_fields); + + let (result, schema) = read_avro(&avro, Some(projection))?; + + assert_eq!(schema, expected_schema); + assert_eq!(result, expected); + Ok(()) +} + +#[test] +fn read_projected() -> Result<()> { + test_projected(vec![ + true, false, false, false, false, false, false, false, false, false, false, + ]) +} diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index b605dc245d3..991cc59a76a 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -117,7 +117,7 @@ fn roundtrip(compression: Option) -> Result<()> { let data = write_avro(&expected, &expected_schema, compression)?; - let (result, read_schema) = read_avro(&data)?; + let (result, read_schema) = read_avro(&data, None)?; assert_eq!(expected_schema, read_schema); for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) { diff --git a/tests/it/io/avro/write_async.rs b/tests/it/io/avro/write_async.rs index 6b2bf2a39fd..1c679c564c7 100644 --- a/tests/it/io/avro/write_async.rs +++ b/tests/it/io/avro/write_async.rs @@ -32,7 +32,7 @@ async fn roundtrip(compression: Option) -> Result<()> { let data = write_avro(&expected, &expected_schema, compression).await?; - let (result, read_schema) = read_avro(&data)?; + let (result, read_schema) = read_avro(&data, None)?; assert_eq!(expected_schema, read_schema); for (c1, c2) in result.columns().iter().zip(expected.columns().iter()) {