Skip to content

Commit

Permalink
Arrow2 in channel data (#7)
Browse files Browse the repository at this point in the history
* replaced ndarray by arrow2 in ChannelData

* removed channel_data_valid

* update cargo.lock

* more anyhow context

* MdBlock parsing for TX tag
  • Loading branch information
ratal authored Feb 6, 2024
1 parent 813e8c1 commit 1ab8973
Show file tree
Hide file tree
Showing 23 changed files with 5,765 additions and 7,891 deletions.
875 changes: 592 additions & 283 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mdfr"
version = "0.4.3"
version = "0.5.0"
description = "A package for reading and writing MDF files"
authors = ["ratal <[email protected]>"]
edition = "2021"
Expand All @@ -17,7 +17,7 @@ anyhow = { version = "1.0", features = ["backtrace"] } # error handling
log = "0.4" # to log events
byteorder = "1.4" # for bytes conversions
binrw = "0.13" # to efficiently read blocks
num = { version = "0.4", features = ["serde"] } # for complex numbers
num-traits = "0.2"
half = "2" # for f16 handling
encoding_rs = "0.8" # for endian management and bytes to text conversion (utf8, SBC, UTF16)
codepage = "0.1" # to convert code page into encoding
Expand Down Expand Up @@ -47,7 +47,7 @@ arrow2 = { version = "0.18", features = [
"io_parquet",
"io_parquet_compression",
] } # for efficient data storing in memory
polars = { version = "0.35", features = [
polars = { version = "0.37", features = [
"dtype-full",
"object",
"fmt",
Expand Down
26 changes: 1 addition & 25 deletions src/c_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! C API
use crate::mdfreader::Mdf;
use arrow2::ffi::{export_array_to_c, export_field_to_c, ArrowArray, ArrowSchema};
use arrow2::ffi::{export_array_to_c, ArrowArray};
use libc::c_char;
use std::ffi::{c_uchar, c_ushort, CStr, CString};

Expand Down Expand Up @@ -190,30 +190,6 @@ pub unsafe extern "C" fn get_channel_array(
}
}

/// returns channel's arrow Schema.
/// null pointer returned if not found
#[no_mangle]
pub unsafe extern "C" fn get_channel_schema(
mdf: *const Mdf,
channel_name: *const libc::c_char,
) -> *const ArrowSchema {
let name = CStr::from_ptr(channel_name)
.to_str()
.expect("Could not convert into utf8 the file name string");
if let Some(mdf) = mdf.as_ref() {
match mdf.get_channel_field(name) {
Some(field) => {
let schema = Box::new(export_field_to_c(field));
let schema_ptr: *const ArrowSchema = &*schema;
schema_ptr
}
None => std::ptr::null::<ArrowSchema>(), // null pointers
}
} else {
panic!("Null pointer given for Mdf Rust object")
}
}

/// export to Parquet file
/// Compression can be one of the following strings
/// "snappy", "gzip", "lzo", "brotli", "lz4", "lz4raw"
Expand Down
438 changes: 166 additions & 272 deletions src/export/numpy.rs

Large diffs are not rendered by default.

160 changes: 152 additions & 8 deletions src/export/parquet.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Exporting mdf to Parquet files.
use arrow2::{
array::Array,
datatypes::DataType,
datatypes::{Field, Metadata, Schema},
error::{Error, Result},
io::parquet::write::{
array_to_columns, compress, to_parquet_schema, CompressedPage, CompressionOptions, DynIter,
Expand All @@ -9,11 +11,21 @@ use arrow2::{
},
io::parquet::{read::ParquetError, write::transverse},
};
use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator};
use codepage::to_encoding;
use encoding_rs::Encoding as EncodingRs;
use rayon::iter::{
IndexedParallelIterator, IntoParallelRefIterator, ParallelExtend, ParallelIterator,
};

use crate::mdfreader::Mdf;
use crate::{
mdfinfo::{
mdfinfo4::{Cn4, MdfInfo4},
MdfInfo,
},
mdfreader::Mdf,
};

use std::collections::VecDeque;
use std::collections::{HashSet, VecDeque};
use std::{fs, path::Path};

struct Bla {
Expand Down Expand Up @@ -60,17 +72,22 @@ pub fn export_to_parquet(mdf: &Mdf, file_name: &str, compression: Option<&str>)
// No other encoding yet implemented, to be reviewed later if needed.
let encoding_map = |_data_type: &DataType| Encoding::Plain;

let (arrow_data, mut arrow_schema) = mdf_data_to_arrow(mdf);
arrow_schema
.metadata
.insert("file_name".to_string(), file_name.to_string());

// declare encodings
let encodings = (mdf.arrow_schema.fields)
let encodings = (arrow_schema.fields)
.par_iter()
.map(|f| transverse(&f.data_type, encoding_map))
.collect::<Vec<_>>();

// derive the parquet schema (physical types) from arrow's schema.
let parquet_schema = to_parquet_schema(&mdf.arrow_schema)
.expect("Failed to create SchemaDescriptor from Schema");
let parquet_schema =
to_parquet_schema(&arrow_schema).expect("Failed to create SchemaDescriptor from Schema");

let row_groups = mdf.arrow_data.iter().map(|batch| {
let row_groups = arrow_data.iter().map(|batch| {
// write batch to pages; parallelized by rayon
let columns = batch
.par_iter()
Expand Down Expand Up @@ -104,7 +121,7 @@ pub fn export_to_parquet(mdf: &Mdf, file_name: &str, compression: Option<&str>)
});

let file = fs::File::create(path).expect("Failed to create file");
let mut writer = FileWriter::try_new(file, mdf.arrow_schema.clone(), options)
let mut writer = FileWriter::try_new(file, arrow_schema.clone(), options)
.expect("Failed to write parquet file");

// write data in file
Expand All @@ -130,3 +147,130 @@ pub fn parquet_compression_from_string(compression_option: Option<&str>) -> Comp
None => CompressionOptions::Uncompressed,
}
}

/// returns arrow field from cn
#[inline]
fn cn4_field(mdfinfo4: &MdfInfo4, cn: &Cn4, data_type: DataType, is_nullable: bool) -> Field {
let field = Field::new(cn.unique_name.clone(), data_type, is_nullable);
let mut metadata = Metadata::new();
if let Ok(Some(unit)) = mdfinfo4.sharable.get_tx(cn.block.cn_md_unit) {
metadata.insert("unit".to_string(), unit);
};
if let Ok(Some(desc)) = mdfinfo4.sharable.get_tx(cn.block.cn_md_comment) {
metadata.insert("description".to_string(), desc);
};
if let Some((Some(master_channel_name), _dg_pos, (_cg_pos, _rec_idd), (_cn_pos, _rec_pos))) =
mdfinfo4.channel_names_set.get(&cn.unique_name)
{
metadata.insert(
"master_channel".to_string(),
master_channel_name.to_string(),
);
}
if cn.block.cn_type == 4 {
metadata.insert(
"sync_channel".to_string(),
cn.block.cn_sync_type.to_string(),
);
}
field.with_metadata(metadata)
}

/// takes data of channel set from MdfInfo structure and stores in mdf.arrow_data
fn mdf_data_to_arrow(mdf: &Mdf) -> (Vec<Vec<Box<dyn Array>>>, Schema) {
let mut chunk_index: usize = 0;
let mut array_index: usize = 0;
let mut field_index: usize = 0;
let mut arrow_schema = Schema::default();
match &mdf.mdf_info {
MdfInfo::V4(mdfinfo4) => {
let mut arrow_data: Vec<Vec<Box<dyn Array>>> = Vec::with_capacity(mdfinfo4.dg.len());
arrow_schema.fields = Vec::<Field>::with_capacity(mdfinfo4.channel_names_set.len());
for (_dg_block_position, dg) in mdfinfo4.dg.iter() {
let mut channel_names_present_in_dg = HashSet::new();
for channel_group in dg.cg.values() {
let cn = channel_group.channel_names.clone();
channel_names_present_in_dg.par_extend(cn);
}
if !channel_names_present_in_dg.is_empty() {
dg.cg.iter().for_each(|(_rec_id, cg)| {
let is_nullable: bool = cg.block.cg_inval_bytes > 0;
let mut columns =
Vec::<Box<dyn Array>>::with_capacity(cg.channel_names.len());
cg.cn.iter().for_each(|(_rec_pos, cn)| {
if !cn.data.is_empty() {
arrow_schema.fields.push(cn4_field(
mdfinfo4,
cn,
cn.data.arrow_data_type().clone(),
is_nullable,
));
columns.push(cn.data.boxed());
array_index += 1;
field_index += 1;
}
});
arrow_data.push(columns);
chunk_index += 1;
array_index = 0;
});
}
}
(arrow_data, arrow_schema)
}
MdfInfo::V3(mdfinfo3) => {
let mut arrow_data: Vec<Vec<Box<dyn Array>>> = Vec::with_capacity(mdfinfo3.dg.len());
arrow_schema.fields = Vec::<Field>::with_capacity(mdfinfo3.channel_names_set.len());
for (_dg_block_position, dg) in mdfinfo3.dg.iter() {
for (_rec_id, cg) in dg.cg.iter() {
let mut columns = Vec::<Box<dyn Array>>::with_capacity(cg.channel_names.len());
for (_rec_pos, cn) in cg.cn.iter() {
if !cn.data.is_empty() {
let field = Field::new(
cn.unique_name.clone(),
cn.data.arrow_data_type().clone(),
false,
);
columns.push(cn.data.boxed());
let mut metadata = Metadata::new();
if let Some(array) =
mdfinfo3.sharable.cc.get(&cn.block1.cn_cc_conversion)
{
let txt = array.0.cc_unit;
let encoding: &'static EncodingRs =
to_encoding(mdfinfo3.id_block.id_codepage)
.unwrap_or(encoding_rs::WINDOWS_1252);
let u: String = encoding.decode(&txt).0.into();
metadata.insert(
"unit".to_string(),
u.trim_end_matches(char::from(0)).to_string(),
);
};
metadata.insert("description".to_string(), cn.description.clone());
if let Some((
Some(master_channel_name),
_dg_pos,
(_cg_pos, _rec_idd),
_cn_pos,
)) = mdfinfo3.channel_names_set.get(&cn.unique_name)
{
metadata.insert(
"master_channel".to_string(),
master_channel_name.to_string(),
);
}
let field = field.with_metadata(metadata);
arrow_schema.fields.push(field);
array_index += 1;
field_index += 1;
}
}
arrow_data.push(columns);
chunk_index += 1;
array_index = 0;
}
}
(arrow_data, arrow_schema)
}
}
}
Loading

0 comments on commit 1ab8973

Please sign in to comment.