Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed missing validation of number of encodings passed when writing to parquet #1057

Merged
merged 1 commit into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/io/parquet/write/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@ use super::{

/// Maps a [`Chunk`] and parquet-specific options to an [`RowGroupIter`] used to
/// write to parquet
/// # Panics
/// Iff
/// * `encodings.len() != fields.len()` or
/// * `encodings.len() != chunk.arrays().len()`
pub fn row_group_iter<A: AsRef<dyn Array> + 'static + Send + Sync>(
chunk: Chunk<A>,
encodings: Vec<Vec<Encoding>>,
fields: Vec<ParquetType>,
options: WriteOptions,
) -> RowGroupIter<'static, Error> {
assert_eq!(encodings.len(), fields.len());
assert_eq!(encodings.len(), chunk.arrays().len());
DynIter::new(
chunk
.into_arrays()
Expand Down Expand Up @@ -63,12 +69,22 @@ pub struct RowGroupIterator<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Re

impl<A: AsRef<dyn Array> + 'static, I: Iterator<Item = Result<Chunk<A>>>> RowGroupIterator<A, I> {
/// Creates a new [`RowGroupIterator`] from an iterator over [`Chunk`].
///
/// # Errors
/// Iff
/// * the Arrow schema can't be converted to a valid Parquet schema.
/// * the length of the encodings is different from the number of fields in schema
pub fn try_new(
iter: I,
schema: &Schema,
options: WriteOptions,
encodings: Vec<Vec<Encoding>>,
) -> Result<Self> {
if encodings.len() != schema.fields.len() {
return Err(Error::InvalidArgumentError(
"The number of encodings must equal the number of fields".to_string(),
));
}
let parquet_schema = to_parquet_schema(schema)?;

Ok(Self {
Expand All @@ -95,6 +111,12 @@ impl<A: AsRef<dyn Array> + 'static + Send + Sync, I: Iterator<Item = Result<Chun

self.iter.next().map(|maybe_chunk| {
let chunk = maybe_chunk?;
if self.encodings.len() != chunk.arrays().len() {
return Err(Error::InvalidArgumentError(
"The number of arrays in the chunk must equal the number of fields in the schema"
.to_string(),
));
};
let encodings = self.encodings.clone();
Ok(row_group_iter(
chunk,
Expand Down
24 changes: 19 additions & 5 deletions src/io/parquet/write/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct FileSink<'a, W: AsyncWrite + Send + Unpin> {
writer: Option<FileStreamer<W>>,
task: Option<BoxFuture<'a, Result<Option<FileStreamer<W>>, Error>>>,
options: WriteOptions,
encoding: Vec<Vec<Encoding>>,
encodings: Vec<Vec<Encoding>>,
schema: Schema,
parquet_schema: SchemaDescriptor,
/// Key-value metadata that will be written to the file on close.
Expand All @@ -73,13 +73,21 @@ where
/// Create a new sink that writes arrays to the provided `writer`.
///
/// # Error
/// If the Arrow schema can't be converted to a valid Parquet schema.
/// Iff
/// * the Arrow schema can't be converted to a valid Parquet schema.
/// * the length of the encodings is different from the number of fields in schema
pub fn try_new(
writer: W,
schema: Schema,
encoding: Vec<Vec<Encoding>>,
encodings: Vec<Vec<Encoding>>,
options: WriteOptions,
) -> Result<Self, Error> {
if encodings.len() != schema.fields.len() {
return Err(Error::InvalidArgumentError(
"The number of encodings must equal the number of fields".to_string(),
));
}

let parquet_schema = crate::io::parquet::write::to_parquet_schema(&schema)?;
let created_by = Some("Arrow2 - Native Rust implementation of Arrow".to_string());
let writer = FileStreamer::new(
Expand All @@ -96,7 +104,7 @@ where
task: None,
options,
schema,
encoding,
encodings,
parquet_schema,
metadata: HashMap::default(),
})
Expand Down Expand Up @@ -146,11 +154,17 @@ where
type Error = Error;

fn start_send(self: Pin<&mut Self>, item: Chunk<Arc<dyn Array>>) -> Result<(), Self::Error> {
if self.schema.fields.len() != item.arrays().len() {
return Err(Error::InvalidArgumentError(
"The number of arrays in the chunk must equal the number of fields in the schema"
.to_string(),
));
}
let this = self.get_mut();
if let Some(mut writer) = this.writer.take() {
let rows = crate::io::parquet::write::row_group_iter(
item,
this.encoding.clone(),
this.encodings.clone(),
this.parquet_schema.fields().to_vec(),
this.options,
);
Expand Down