Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed StackOverflow in many row groups
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 4, 2022
1 parent 497d431 commit e9bcd47
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,9 @@ pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: Vec<RowGroupMetaData>,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
chunk_size: Option<usize>,
remaining_rows: usize,
current_group: usize,
}

impl<R: Read + Seek> RowGroupReader<R> {
Expand All @@ -199,10 +198,9 @@ impl<R: Read + Seek> RowGroupReader<R> {
reader,
schema,
groups_filter,
row_groups,
row_groups: row_groups.into_iter().enumerate(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
current_group: 0,
}
}

Expand All @@ -216,28 +214,27 @@ impl<R: Read + Seek> RowGroupReader<R> {
if self.schema.fields.is_empty() {
return Ok(None);
}
if self.current_group == self.row_groups.len() {
// reached the last row group
return Ok(None);
};
if self.remaining_rows == 0 {
// reached the limit
return Ok(None);
}

let current_row_group = self.current_group;
let row_group = &self.row_groups[current_row_group];
if let Some(groups_filter) = self.groups_filter.as_ref() {
if !(groups_filter)(current_row_group, row_group) {
self.current_group += 1;
return self._next();
}
}
self.current_group += 1;
let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() {
self.row_groups
.by_ref()
.find(|(index, row_group)| !(groups_filter)(*index, row_group))
} else {
self.row_groups.next()
};
let row_group = if let Some((_, row_group)) = row_group {
row_group
} else {
return Ok(None);
};

let column_chunks = read_columns_many(
&mut self.reader,
row_group,
&row_group,
self.schema.fields.clone(),
self.chunk_size,
Some(self.remaining_rows),
Expand All @@ -263,7 +260,6 @@ impl<R: Read + Seek> Iterator for RowGroupReader<R> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.row_groups.len() - self.current_group;
(len, Some(len))
self.row_groups.size_hint()
}
}

0 comments on commit e9bcd47

Please sign in to comment.