Skip to content

Commit

Permalink
Simplify parquet arror RecordReader (#1021)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored Dec 13, 2021
1 parent 7e44ca8 commit 07660c6
Showing 1 changed file with 33 additions and 40 deletions.
73 changes: 33 additions & 40 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,8 @@ pub struct RecordReader<T: DataType> {
/// Number of values `num_records` contains.
num_values: usize,

values_seen: usize,
/// Starts from 1, number of values have been written to buffer
values_written: usize,
in_middle_of_record: bool,
}

impl<T: DataType> RecordReader<T> {
Expand Down Expand Up @@ -75,9 +73,7 @@ impl<T: DataType> RecordReader<T> {
column_desc: column_schema,
num_records: 0,
num_values: 0,
values_seen: 0,
values_written: 0,
in_middle_of_record: false,
}
}

Expand Down Expand Up @@ -107,21 +103,25 @@ impl<T: DataType> RecordReader<T> {
loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
records_read += self.split_records(num_records - records_read)?;

// Since page reader contains complete records, so if we reached end of a
// page reader, we should reach the end of a record
if end_of_column
&& self.values_seen >= self.values_written
&& self.in_middle_of_record
{
self.num_records += 1;
self.num_values = self.values_seen;
self.in_middle_of_record = false;
records_read += 1;
let (record_count, value_count) =
self.count_records(num_records - records_read);

self.num_records += record_count;
self.num_values += value_count;
records_read += record_count;

if records_read == num_records {
break;
}

if (records_read >= num_records) || end_of_column {
if end_of_column {
// Since page reader contains complete records, if we reached end of a
// page reader, we should reach the end of a record
if self.rep_levels.is_some() {
self.num_records += 1;
self.num_values = self.values_written;
records_read += 1;
}
break;
}

Expand Down Expand Up @@ -265,8 +265,6 @@ impl<T: DataType> RecordReader<T> {
self.values_written -= self.num_values;
self.num_records = 0;
self.num_values = 0;
self.values_seen = 0;
self.in_middle_of_record = false;
}

/// Returns bitmap data.
Expand Down Expand Up @@ -367,10 +365,11 @@ impl<T: DataType> RecordReader<T> {
Ok(values_read)
}

/// Split values into records according repetition definition and returns number of
/// records read.
#[allow(clippy::unnecessary_wraps)]
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
/// Inspects the buffered repetition levels in the range `self.num_values..self.values_written`
/// and returns the number of "complete" records along with the corresponding number of values
///
/// A "complete" record is one where the buffer contains a subsequent repetition level of 0
fn count_records(&self, records_to_read: usize) -> (usize, usize) {
let rep_levels = self.rep_levels.as_ref().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.as_slice().align_to::<i16>() };
Expand All @@ -381,32 +380,26 @@ impl<T: DataType> RecordReader<T> {
match rep_levels {
Some(buf) => {
let mut records_read = 0;
let mut end_of_last_record = self.num_values;

for current in self.num_values..self.values_written {
if buf[current] == 0 && current != self.num_values {
records_read += 1;
end_of_last_record = current;

while (self.values_seen < self.values_written)
&& (records_read < records_to_read)
{
if buf[self.values_seen] == 0 {
if self.in_middle_of_record {
records_read += 1;
self.num_records += 1;
self.num_values = self.values_seen;
if records_read == records_to_read {
break;
}
self.in_middle_of_record = true;
}
self.values_seen += 1;
}

Ok(records_read)
(records_read, end_of_last_record - self.num_values)
}
None => {
let records_read =
min(records_to_read, self.values_written - self.values_seen);
self.num_records += records_read;
self.num_values += records_read;
self.values_seen += records_read;
self.in_middle_of_record = false;
min(records_to_read, self.values_written - self.num_values);

Ok(records_read)
(records_read, records_read)
}
}
}
Expand Down

0 comments on commit 07660c6

Please sign in to comment.