From 16d01efcd211ff29280f214d9476f2f3ba2d5c97 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 19 Jun 2022 04:59:08 +0200 Subject: [PATCH] Fixed IPC projection (#1082) --- src/io/ipc/read/common.rs | 36 +++++++++++++++++------------------ src/io/ipc/read/file_async.rs | 4 ++-- src/io/ipc/read/reader.rs | 4 ++-- tests/it/io/ipc/read/file.rs | 26 +++++++++++++++++++------ 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/src/io/ipc/read/common.rs b/src/io/ipc/read/common.rs index f2a9eaa8522..7e32cde5d0c 100644 --- a/src/io/ipc/read/common.rs +++ b/src/io/ipc/read/common.rs @@ -321,31 +321,31 @@ pub fn prepare_projection( let fields = projection.iter().map(|x| fields[*x].clone()).collect(); - // selected index; index in - let sorted_projection = projection - .iter() - .copied() - .enumerate() - .map(|x| (x.1, x.0)) - .collect::>(); // e.g. [2, 1] -> {2: 0, 1: 1} - projection.sort_unstable(); // e.g. [2, 1] -> [1, 2] + // todo: find way to do this more efficiently + let mut indices = (0..projection.len()).collect::>(); + indices.sort_unstable_by_key(|&i| &projection[i]); + let map = indices.iter().copied().enumerate().fold( + HashMap::default(), + |mut acc, (index, new_index)| { + if !acc.contains_key(&new_index) { + acc.insert(index, new_index); + }; + acc + }, + ); + projection.sort_unstable(); - (projection, sorted_projection, fields) + (projection, map, fields) } pub fn apply_projection( chunk: Chunk>, - projection: &[usize], map: &HashMap, ) -> Chunk> { // re-order according to projection - let arrays = chunk.into_arrays(); - let arrays = projection - .iter() - .map(|x| { - let index = map.get(x).unwrap(); - arrays[*index].clone() - }) - .collect(); + let mut arrays = chunk.into_arrays(); + map.iter().for_each(|(old, new)| { + arrays.swap(*old, *new); + }); Chunk::new(arrays) } diff --git a/src/io/ipc/read/file_async.rs b/src/io/ipc/read/file_async.rs index a4674167e3c..8397eb4a016 100644 --- a/src/io/ipc/read/file_async.rs +++ b/src/io/ipc/read/file_async.rs @@ -90,9 +90,9 @@ impl<'a> FileStream<'a> { &mut block_buffer, ).await?; - let chunk = if let Some((projection, map)) = &projection { + let chunk = if let Some((_, map)) = &projection { // re-order according to projection - apply_projection(chunk, projection, map) + apply_projection(chunk, map) } else { chunk }; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 1e3f4459f9e..9807c198496 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -379,9 +379,9 @@ impl Iterator for FileReader { &mut self.buffer, ); - let chunk = if let Some((projection, map, _)) = &self.projection { + let chunk = if let Some((_, map, _)) = &self.projection { // re-order according to projection - chunk.map(|chunk| apply_projection(chunk, projection, map)) + chunk.map(|chunk| apply_projection(chunk, map)) } else { chunk }; diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 18a3190c8bf..022cf4de274 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -1,5 +1,6 @@ use std::fs::File; +use arrow2::chunk::Chunk; use arrow2::error::Result; use arrow2::io::ipc::read::*; @@ -166,18 +167,29 @@ fn test_projection(version: &str, file_name: &str, columns: Vec) -> Resul let metadata = read_file_metadata(&mut file)?; - let expected = columns + let (_, _, chunks) = read_gzip_json(version, file_name)?; + + let expected_fields = columns .iter() .copied() .map(|x| metadata.schema.fields[x].clone()) .collect::>(); - let mut reader = FileReader::new(&mut file, metadata, Some(columns)); + let expected_chunks = chunks.into_iter().map(|chunk| { + let columns = columns + .iter() + .copied() + .map(|x| chunk.arrays()[x].clone()) + .collect::>(); + Chunk::new(columns) + }); - assert_eq!(reader.schema().fields, expected); + let reader = FileReader::new(&mut file, metadata, Some(columns.clone())); - reader.try_for_each(|rhs| { - assert_eq!(rhs?.arrays().len(), expected.len()); + assert_eq!(reader.schema().fields, expected_fields); + + reader.zip(expected_chunks).try_for_each(|(lhs, rhs)| { + assert_eq!(&lhs?.arrays()[0], &rhs.arrays()[0]); Result::Ok(()) })?; Ok(()) @@ -189,7 +201,9 @@ fn read_projected() -> Result<()> { test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?; test_projection("1.0.0-littleendian", "generated_nested", vec![0])?; - test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1]) + test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?; + + test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1]) } fn read_corrupted_ipc(data: Vec) -> Result<()> {