Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed IPC projection (#1082)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jun 19, 2022
1 parent 00bb088 commit 16d01ef
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 28 deletions.
36 changes: 18 additions & 18 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,31 +321,31 @@ pub fn prepare_projection(

let fields = projection.iter().map(|x| fields[*x].clone()).collect();

// selected index; index in
let sorted_projection = projection
.iter()
.copied()
.enumerate()
.map(|x| (x.1, x.0))
.collect::<HashMap<_, _>>(); // e.g. [2, 1] -> {2: 0, 1: 1}
projection.sort_unstable(); // e.g. [2, 1] -> [1, 2]
// todo: find way to do this more efficiently
let mut indices = (0..projection.len()).collect::<Vec<_>>();
indices.sort_unstable_by_key(|&i| &projection[i]);
let map = indices.iter().copied().enumerate().fold(
HashMap::default(),
|mut acc, (index, new_index)| {
if !acc.contains_key(&new_index) {
acc.insert(index, new_index);
};
acc
},
);
projection.sort_unstable();

(projection, sorted_projection, fields)
(projection, map, fields)
}

pub fn apply_projection(
chunk: Chunk<Box<dyn Array>>,
projection: &[usize],
map: &HashMap<usize, usize>,
) -> Chunk<Box<dyn Array>> {
// re-order according to projection
let arrays = chunk.into_arrays();
let arrays = projection
.iter()
.map(|x| {
let index = map.get(x).unwrap();
arrays[*index].clone()
})
.collect();
let mut arrays = chunk.into_arrays();
map.iter().for_each(|(old, new)| {
arrays.swap(*old, *new);
});
Chunk::new(arrays)
}
4 changes: 2 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ impl<'a> FileStream<'a> {
&mut block_buffer,
).await?;

let chunk = if let Some((projection, map)) = &projection {
let chunk = if let Some((_, map)) = &projection {
// re-order according to projection
apply_projection(chunk, projection, map)
apply_projection(chunk, map)
} else {
chunk
};
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,9 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
&mut self.buffer,
);

let chunk = if let Some((projection, map, _)) = &self.projection {
let chunk = if let Some((_, map, _)) = &self.projection {
// re-order according to projection
chunk.map(|chunk| apply_projection(chunk, projection, map))
chunk.map(|chunk| apply_projection(chunk, map))
} else {
chunk
};
Expand Down
26 changes: 20 additions & 6 deletions tests/it/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fs::File;

use arrow2::chunk::Chunk;
use arrow2::error::Result;
use arrow2::io::ipc::read::*;

Expand Down Expand Up @@ -166,18 +167,29 @@ fn test_projection(version: &str, file_name: &str, columns: Vec<usize>) -> Resul

let metadata = read_file_metadata(&mut file)?;

let expected = columns
let (_, _, chunks) = read_gzip_json(version, file_name)?;

let expected_fields = columns
.iter()
.copied()
.map(|x| metadata.schema.fields[x].clone())
.collect::<Vec<_>>();

let mut reader = FileReader::new(&mut file, metadata, Some(columns));
let expected_chunks = chunks.into_iter().map(|chunk| {
let columns = columns
.iter()
.copied()
.map(|x| chunk.arrays()[x].clone())
.collect::<Vec<_>>();
Chunk::new(columns)
});

assert_eq!(reader.schema().fields, expected);
let reader = FileReader::new(&mut file, metadata, Some(columns.clone()));

reader.try_for_each(|rhs| {
assert_eq!(rhs?.arrays().len(), expected.len());
assert_eq!(reader.schema().fields, expected_fields);

reader.zip(expected_chunks).try_for_each(|(lhs, rhs)| {
assert_eq!(&lhs?.arrays()[0], &rhs.arrays()[0]);
Result::Ok(())
})?;
Ok(())
Expand All @@ -189,7 +201,9 @@ fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_dictionary", vec![2])?;
test_projection("1.0.0-littleendian", "generated_nested", vec![0])?;

test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])
test_projection("1.0.0-littleendian", "generated_primitive", vec![2, 1])?;

test_projection("1.0.0-littleendian", "generated_primitive", vec![0, 2, 1])
}

fn read_corrupted_ipc(data: Vec<u8>) -> Result<()> {
Expand Down

0 comments on commit 16d01ef

Please sign in to comment.