Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for limit pushdown to IPC reading (#1135)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 5, 2022
1 parent f5f6b7e commit 55e5924
Show file tree
Hide file tree
Showing 33 changed files with 185 additions and 60 deletions.
2 changes: 1 addition & 1 deletion examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,6 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
fn read_ipc(buf: &[u8]) -> Result<Chunk<Box<dyn Array>>> {
let mut cursor = Cursor::new(buf);
let metadata = read::read_file_metadata(&mut cursor)?;
let mut reader = read::FileReader::new(cursor, metadata, None);
let mut reader = read::FileReader::new(cursor, metadata, None, None);
reader.next().unwrap()
}
18 changes: 10 additions & 8 deletions examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use arrow2::io::print;

/// Simplest way: read all record batches from the file. This can be used e.g. for random access.
#[allow(clippy::type_complexity)]
fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
fn read_chunks(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let mut file = File::open(path)?;

// read the files' metadata. At this point, we can distribute the read whatever we like.
Expand All @@ -18,10 +18,10 @@ fn read_batches(path: &str) -> Result<(Schema, Vec<Chunk<Box<dyn Array>>>)> {
let schema = metadata.schema.clone();

// Simplest way: use the reader, an iterator over batches.
let reader = read::FileReader::new(file, metadata, None);
let reader = read::FileReader::new(file, metadata, None, None);

let columns = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, columns))
let chunks = reader.collect::<Result<Vec<_>>>()?;
Ok((schema, chunks))
}

/// Random access way: read a single record batch from the file. This can be used e.g. for random access.
Expand All @@ -36,13 +36,15 @@ fn read_batch(path: &str) -> Result<(Schema, Chunk<Box<dyn Array>>)> {
// advanced way: read the dictionary
let dictionaries = read::read_file_dictionaries(&mut file, &metadata, &mut Default::default())?;

// and the chunk
let chunk_index = 0;

let chunk = read::read_batch(
&mut file,
&dictionaries,
&metadata,
None,
None,
chunk_index,
&mut Default::default(),
&mut Default::default(),
Expand All @@ -57,12 +59,12 @@ fn main() -> Result<()> {

let file_path = &args[1];

let (schema, batches) = read_batches(file_path)?;
let (schema, chunks) = read_chunks(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&batches, &names));
println!("{}", print::write(&chunks, &names));

let (schema, batch) = read_batch(file_path)?;
let (schema, chunk) = read_batch(file_path)?;
let names = schema.fields.iter().map(|f| &f.name).collect::<Vec<_>>();
println!("{}", print::write(&[batch], &names));
println!("{}", print::write(&[chunk], &names));
Ok(())
}
10 changes: 5 additions & 5 deletions examples/ipc_file_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use arrow2::datatypes::{DataType, Field, Schema};
use arrow2::error::Result;
use arrow2::io::ipc::write;

fn write_batches(path: &str, schema: Schema, columns: &[Chunk<Box<dyn Array>>]) -> Result<()> {
fn write_batches(path: &str, schema: Schema, chunks: &[Chunk<Box<dyn Array>>]) -> Result<()> {
let file = File::create(path)?;

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::new(file, schema, None, options);

writer.start()?;
for columns in columns {
writer.write(columns, None)?
for chunk in chunks {
writer.write(chunk, None)?
}
writer.finish()
}
Expand All @@ -34,9 +34,9 @@ fn main() -> Result<()> {
let a = Int32Array::from_slice(&[1, 2, 3, 4, 5]);
let b = Utf8Array::<i32>::from_slice(&["a", "b", "c", "d", "e"]);

let batch = Chunk::try_new(vec![a.boxed(), b.boxed()])?;
let chunk = Chunk::try_new(vec![a.boxed(), b.boxed()])?;

// write it
write_batches(file_path, schema, &[batch])?;
write_batches(file_path, schema, &[chunk])?;
Ok(())
}
2 changes: 1 addition & 1 deletion examples/parquet_write_parallel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn parallel_write(path: &str, schema: Schema, chunks: &[Chunk]) -> Result<()> {

// declare encodings
let encodings = (&schema.fields)
.par_iter()
.iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn main() -> Result<()> {
let filename = args.file_name;
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(f, metadata.clone(), None);
let mut reader = read::FileReader::new(f, metadata.clone(), None, None);

let options = write::WriteOptions { compression: None };
let mut writer = write::StreamWriter::new(std::io::stdout(), options);
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata.clone(), None);
let reader = read::FileReader::new(arrow_file, metadata.clone(), None, None);

let names = metadata
.schema
Expand Down Expand Up @@ -118,7 +118,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(arrow_file, metadata, None);
let reader = read::FileReader::new(arrow_file, metadata, None, None);
let arrow_schema = reader.schema();

// compare schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ fn chunk_from_message(
fields,
ipc_schema,
None,
None,
dictionaries,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
Expand Down
1 change: 1 addition & 0 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub fn deserialize_batch(
fields,
ipc_schema,
None,
None,
dictionaries,
message.version()?,
&mut reader,
Expand Down
8 changes: 4 additions & 4 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
//! // Fetch some of the data and get the reader back
//! let mut reader = File::open(&path)?;
//! let metadata = read_file_metadata(&mut reader)?;
//! let mut filereader = FileReader::new(reader, metadata, None);
//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = filereader.into_inner();
//! let mut reader = FileReader::new(reader, metadata, None, None);
//! let row1 = reader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let row2 = reader.next().unwrap(); // [[-1, 1], [1, -1]]
//! let mut reader = reader.into_inner();
//! // Do more stuff with the reader, like seeking ahead.
//! # Ok::<(), Error>(())
//! ```
Expand Down
5 changes: 4 additions & 1 deletion src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -34,13 +35,15 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets: Buffer<O> = read_buffer(
buffers,
Expand All @@ -54,7 +57,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;

let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize();
let last_offset = offsets.last().unwrap().to_usize();
let values = read_buffer(
buffers,
last_offset,
Expand Down
13 changes: 8 additions & 5 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_boolean<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -26,21 +27,23 @@ pub fn read_boolean<R: Read + Seek>(
))
})?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let values = read_bitmap(
buffers,
length,
Expand Down
2 changes: 2 additions & 0 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
dictionaries: &Dictionaries,
block_offset: u64,
compression: Option<Compression>,
limit: Option<usize>,
is_little_endian: bool,
scratch: &mut Vec<u8>,
) -> Result<DictionaryArray<T>>
Expand Down Expand Up @@ -48,6 +49,7 @@ where
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

Expand Down
3 changes: 3 additions & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
Expand All @@ -33,13 +34,15 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?);
let values = read_buffer(
Expand Down
7 changes: 6 additions & 1 deletion src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<FixedSizeListArray> {
Expand All @@ -38,10 +39,13 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let (field, _) = FixedSizeListArray::get_child_and_size(&data_type);
let (field, size) = FixedSizeListArray::get_child_and_size(&data_type);

let limit = limit.map(|x| x.saturating_mul(size));

let values = read(
field_nodes,
Expand All @@ -53,6 +57,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
version,
scratch,
)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<ListArray<O>>
Expand All @@ -43,13 +44,15 @@ where
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets = read_buffer::<O, _>(
buffers,
Expand All @@ -63,6 +66,8 @@ where
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(vec![O::default()])))?;

let last_offset = offsets.last().unwrap().to_usize();

let field = ListArray::<O>::get_child_field(&data_type);

let values = read(
Expand All @@ -75,6 +80,7 @@ where
block_offset,
is_little_endian,
compression,
Some(last_offset),
version,
scratch,
)?;
Expand Down
6 changes: 6 additions & 0 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub fn read_map<R: Read + Seek>(
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
version: Version,
scratch: &mut Vec<u8>,
) -> Result<MapArray> {
Expand All @@ -39,13 +40,15 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| Error::from(OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);

let offsets = read_buffer::<i32, _>(
buffers,
Expand All @@ -61,6 +64,8 @@ pub fn read_map<R: Read + Seek>(

let field = MapArray::get_field(&data_type);

let last_offset: usize = offsets.last().copied().unwrap() as usize;

let field = read(
field_nodes,
field,
Expand All @@ -71,6 +76,7 @@ pub fn read_map<R: Read + Seek>(
block_offset,
is_little_endian,
compression,
Some(last_offset),
version,
scratch,
)?;
Expand Down
Loading

0 comments on commit 55e5924

Please sign in to comment.