Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix ipc column order #3706

Merged
merged 1 commit into from
Jun 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 73 additions & 6 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ impl<R: Read + Seek> IpcReader<R> {
self
}

#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
mut self,
Expand All @@ -120,14 +119,20 @@ impl<R: Read + Seek> IpcReader<R> {
let rechunk = self.rechunk;
let metadata = read::read_file_metadata(&mut self.reader)?;

let schema = if let Some(projection) = &projection {
let sorted_projection = projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let reader = read::FileReader::new(&mut self.reader, metadata, projection);
let reader = read::FileReader::new(&mut self.reader, metadata, sorted_projection);

let include_row_count = self.row_count.is_some();
finish_reader(
reader,
rechunk,
Expand All @@ -137,6 +142,7 @@ impl<R: Read + Seek> IpcReader<R> {
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, projection, include_row_count))
}
}

Expand Down Expand Up @@ -179,13 +185,20 @@ where
self.projection = Some(prj);
}

let schema = if let Some(projection) = &self.projection {
let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection);
let include_row_count = self.row_count.is_some();
let ipc_reader =
read::FileReader::new(&mut self.reader, metadata.clone(), sorted_projection);
finish_reader(
ipc_reader,
rechunk,
Expand All @@ -195,6 +208,31 @@ where
&schema,
self.row_count,
)
.map(|df| fix_column_order(df, self.projection, include_row_count))
}
}

fn fix_column_order(df: DataFrame, projection: Option<Vec<usize>>, row_count: bool) -> DataFrame {
if let Some(proj) = projection {
let offset = if row_count { 1 } else { 0 };
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if row_count {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

DataFrame::new_no_checks(cols)
} else {
df
}
}

Expand Down Expand Up @@ -367,7 +405,36 @@ mod test {
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
]
.unwrap();
IpcWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
]
.unwrap();
let df_read = IpcReader::new(buf)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
]))
.finish()
.unwrap();
df_read.frame_equal(&expected);
}

Expand Down