From 69be888ca33acf9e63cefc61de669d309a512dfc Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Tue, 7 Jun 2022 16:48:56 +0000 Subject: [PATCH] Added validation of number of encodings --- src/io/parquet/write/row_group.rs | 22 ++++++++++++++++++++++ src/io/parquet/write/sink.rs | 24 +++++++++++++++++++----- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/io/parquet/write/row_group.rs b/src/io/parquet/write/row_group.rs index b6c4da45625..b9960b56001 100644 --- a/src/io/parquet/write/row_group.rs +++ b/src/io/parquet/write/row_group.rs @@ -17,12 +17,18 @@ use super::{ /// Maps a [`Chunk`] and parquet-specific options to an [`RowGroupIter`] used to /// write to parquet +/// # Panics +/// Iff +/// * `encodings.len() != fields.len()` or +/// * `encodings.len() != chunk.arrays().len()` pub fn row_group_iter + 'static + Send + Sync>( chunk: Chunk, encodings: Vec>, fields: Vec, options: WriteOptions, ) -> RowGroupIter<'static, Error> { + assert_eq!(encodings.len(), fields.len()); + assert_eq!(encodings.len(), chunk.arrays().len()); DynIter::new( chunk .into_arrays() @@ -63,12 +69,22 @@ pub struct RowGroupIterator + 'static, I: Iterator + 'static, I: Iterator>>> RowGroupIterator { /// Creates a new [`RowGroupIterator`] from an iterator over [`Chunk`]. + /// + /// # Errors + /// Iff + /// * the Arrow schema can't be converted to a valid Parquet schema. + /// * the length of the encodings is different from the number of fields in schema pub fn try_new( iter: I, schema: &Schema, options: WriteOptions, encodings: Vec>, ) -> Result { + if encodings.len() != schema.fields.len() { + return Err(Error::InvalidArgumentError( + "The number of encodings must equal the number of fields".to_string(), + )); + } let parquet_schema = to_parquet_schema(schema)?; Ok(Self { @@ -95,6 +111,12 @@ impl + 'static + Send + Sync, I: Iterator { writer: Option>, task: Option>, Error>>>, options: WriteOptions, - encoding: Vec>, + encodings: Vec>, schema: Schema, parquet_schema: SchemaDescriptor, /// Key-value metadata that will be written to the file on close. @@ -73,13 +73,21 @@ where /// Create a new sink that writes arrays to the provided `writer`. /// /// # Error - /// If the Arrow schema can't be converted to a valid Parquet schema. + /// Iff + /// * the Arrow schema can't be converted to a valid Parquet schema. + /// * the length of the encodings is different from the number of fields in schema pub fn try_new( writer: W, schema: Schema, - encoding: Vec>, + encodings: Vec>, options: WriteOptions, ) -> Result { + if encodings.len() != schema.fields.len() { + return Err(Error::InvalidArgumentError( + "The number of encodings must equal the number of fields".to_string(), + )); + } + let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?; let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string()); let writer = FileStreamer::new( @@ -96,7 +104,7 @@ where task: None, options, schema, - encoding, + encodings, parquet_schema, metadata: HashMap::default(), }) @@ -146,11 +154,17 @@ where type Error = Error; fn start_send(self: Pin<&mut Self>, item: Chunk>) -> Result<(), Self::Error> { + if self.schema.fields.len() != item.arrays().len() { + return Err(Error::InvalidArgumentError( + "The number of arrays in the chunk must equal the number of fields in the schema" + .to_string(), + )); + } let this = self.get_mut(); if let Some(mut writer) = this.writer.take() { let rows = crate::io::parquet::write::row_group_iter( item, - this.encoding.clone(), + this.encodings.clone(), this.parquet_schema.fields().to_vec(), this.options, );