From 67b1c3dbcec4b428a18acb3d3a759e3ea8515f36 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 15 Jun 2022 12:27:16 +0200 Subject: [PATCH] fix ipc column order --- polars/polars-io/src/ipc.rs | 79 ++++++++++++++++++++++++++++++++++--- 1 file changed, 73 insertions(+), 6 deletions(-) diff --git a/polars/polars-io/src/ipc.rs b/polars/polars-io/src/ipc.rs index 7d6b7d477936..ef5aeca77633 100644 --- a/polars/polars-io/src/ipc.rs +++ b/polars/polars-io/src/ipc.rs @@ -109,7 +109,6 @@ impl IpcReader { self } - #[cfg(feature = "lazy")] // todo! hoist to lazy crate pub fn finish_with_scan_ops( mut self, @@ -120,14 +119,20 @@ impl IpcReader { let rechunk = self.rechunk; let metadata = read::read_file_metadata(&mut self.reader)?; - let schema = if let Some(projection) = &projection { + let sorted_projection = projection.clone().map(|mut proj| { + proj.sort_unstable(); + proj + }); + + let schema = if let Some(projection) = &sorted_projection { apply_projection(&metadata.schema, projection) } else { metadata.schema.clone() }; - let reader = read::FileReader::new(&mut self.reader, metadata, projection); + let reader = read::FileReader::new(&mut self.reader, metadata, sorted_projection); + let include_row_count = self.row_count.is_some(); finish_reader( reader, rechunk, @@ -137,6 +142,7 @@ impl IpcReader { &schema, self.row_count, ) + .map(|df| fix_column_order(df, projection, include_row_count)) } } @@ -179,13 +185,20 @@ where self.projection = Some(prj); } - let schema = if let Some(projection) = &self.projection { + let sorted_projection = self.projection.clone().map(|mut proj| { + proj.sort_unstable(); + proj + }); + + let schema = if let Some(projection) = &sorted_projection { apply_projection(&metadata.schema, projection) } else { metadata.schema.clone() }; - let ipc_reader = read::FileReader::new(&mut self.reader, metadata, self.projection); + let include_row_count = self.row_count.is_some(); + let ipc_reader = + read::FileReader::new(&mut self.reader, metadata.clone(), sorted_projection); finish_reader( ipc_reader, rechunk, @@ -195,6 +208,31 @@ where &schema, self.row_count, ) + .map(|df| fix_column_order(df, self.projection, include_row_count)) + } +} + +fn fix_column_order(df: DataFrame, projection: Option>, row_count: bool) -> DataFrame { + if let Some(proj) = projection { + let offset = if row_count { 1 } else { 0 }; + let mut args = (0..proj.len()).zip(proj).collect::>(); + // first el of tuple is argument index + // second el is the projection index + args.sort_unstable_by_key(|tpl| tpl.1); + let cols = df.get_columns(); + + let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone()); + let cols = if row_count { + let mut new_cols = vec![df.get_columns()[0].clone()]; + new_cols.extend(iter); + new_cols + } else { + iter.collect() + }; + + DataFrame::new_no_checks(cols) + } else { + df } } @@ -367,7 +405,36 @@ mod test { .with_columns(Some(vec!["c".to_string(), "b".to_string()])) .finish() .unwrap(); - assert_eq!(df_read.shape(), (3, 2)); + df_read.frame_equal(&expected); + + let mut buf: Cursor> = Cursor::new(Vec::new()); + let mut df = df![ + "a" => ["x", "y", "z"], + "b" => [123, 456, 789], + "c" => [4.5, 10.0, 10.0], + "d" => ["misc", "other", "value"], + ] + .unwrap(); + IpcWriter::new(&mut buf) + .finish(&mut df) + .expect("ipc writer"); + buf.set_position(0); + let expected = df![ + "a" => ["x", "y", "z"], + "c" => [4.5, 10.0, 10.0], + "d" => ["misc", "other", "value"], + "b" => [123, 456, 789], + ] + .unwrap(); + let df_read = IpcReader::new(buf) + .with_columns(Some(vec![ + "a".to_string(), + "c".to_string(), + "d".to_string(), + "b".to_string(), + ])) + .finish() + .unwrap(); df_read.frame_equal(&expected); }