Skip to content

Commit

Permalink
[tmp] bench.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed May 20, 2022
1 parent ec16ca1 commit ed464ae
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 33 deletions.
4 changes: 4 additions & 0 deletions common/datavalues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,7 @@ harness = false
[[bench]]
name = "data_type"
harness = false

[[bench]]
name = "csv"
harness = false
45 changes: 45 additions & 0 deletions common/datavalues/benches/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
extern crate core;

use common_datavalues::serializations::formats::csv;
use common_datavalues::ColumnRef;
use common_datavalues::Series;
use common_datavalues::SeriesFrom;
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;

fn add_benchmark(c: &mut Criterion) {
(10..=21).step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);
let col = create_primitive_array(size);
c.bench_function(
&format!("i32 2^{} not null, write_by_row", log2_size),
|b| {
b.iter(|| csv::write_by_row(&col));
},
);
c.bench_function(
&format!("i32 2^{} not null, write_iterator", log2_size),
|b| {
b.iter(|| csv::write_iterator(&col));
},
);
});
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);

pub fn create_primitive_array(size: usize) -> ColumnRef {
let mut rng = seedable_rng();

let v = (0..size).map(|_| rng.gen()).collect::<Vec<i32>>();
Series::from_data(v)
}

pub fn seedable_rng() -> StdRng {
StdRng::seed_from_u64(42)
}
5 changes: 5 additions & 0 deletions common/datavalues/src/columns/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ impl ScalarColumn for ArrayColumn {
ArrayValueRef::Indexed { column: self, idx }
}

#[inline]
fn get_data_owned(&self, idx: usize) -> Self::OwnedItem {
self.get(idx).into()
}

fn scalar_iter(&self) -> Self::Iterator<'_> {
ArrayValueIter::new(self)
}
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/columns/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ where
fn get_data(&self, idx: usize) -> Self::RefItem<'_> {
self.values[idx]
}
fn get_data_owned(&self, idx: usize) -> Self::OwnedItem {
self.values[idx].clone()
}

fn scalar_iter(&self) -> Self::Iterator<'_> {
self.iter().copied()
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/scalars/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ where for<'a> Self::OwnedItem: Scalar<RefType<'a> = Self::RefItem<'a>>
// Note: get_data has bad performance, avoid call this function inside the loop
// Use `iter` instead
fn get_data(&self, idx: usize) -> Self::RefItem<'_>;
fn get_data_owned(&self, _idx: usize) -> Self::OwnedItem {
unimplemented!()
}

/// Get iterator of this column.
fn scalar_iter(&self) -> Self::Iterator<'_>;
Expand Down
81 changes: 81 additions & 0 deletions common/datavalues/src/types/serializations/formats/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use common_io::prelude::FormatSettings;

use crate::ColumnRef;
use crate::DataType;
use crate::TypeSerializer;

#[allow(dead_code)]
pub fn write_vec(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);

let s = col.data_type().create_serializer();
let v = s
.serialize_column(&col, &FormatSettings::default())
.unwrap();
for field in v {
buf.extend_from_slice(&field.as_bytes());
}
buf
}

pub fn write_by_row(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);
let rows = col.len();
let s = col.data_type().create_serializer();
let f = &FormatSettings::default();
for row in 0..rows {
s.write_csv_field(col, row, &mut buf, f).unwrap();
}
buf
}

pub fn write_iterator(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);

let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default()).unwrap();
while let Some(field) = stream.next() {
buf.extend_from_slice(field);
}
buf
}

#[test]
fn test_2() -> Result<()> {
use crate::Series;
use crate::SeriesFrom;
let col = Series::from_data(vec![12u8, 23u8, 34u8]);
let exp = [49, 50, 50, 51, 51, 52];
assert_eq!(write_iterator(&col), exp);
assert_eq!(write_by_row(&col), exp);
Ok(())
}

#[test]
fn test_s() -> Result<()> {
use crate::Series;
use crate::SeriesFrom;
use crate::TypeSerializer;
// let col = Series::from_data(vec![true, false, true]);
// let col = Series::from_data(vec!["a", "a", "bc"]);
// let col = Series::from_data(vec![12, 23, 34]);
let col = Series::from_data(vec![12u8, 23u8, 34u8]);

println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());

let col = Series::from_data(vec![Some(12), None, Some(34)]);
println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
Ok(())
}
19 changes: 19 additions & 0 deletions common/datavalues/src/types/serializations/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,20 @@
pub mod csv;
pub mod iterators;

#[inline]
pub fn lexical_to_bytes_mut_no_clear<N: lexical_core::ToLexical>(n: N, buf: &mut Vec<u8>) {
buf.reserve(N::FORMATTED_SIZE_DECIMAL);
let len0 = buf.len();
unsafe {
// JUSTIFICATION
// Benefit
// Allows using the faster serializer lexical core and convert to string
// Soundness
// Length of buf is set as written length afterwards. lexical_core
// creates a valid string, so doesn't need to be checked.
let slice =
std::slice::from_raw_parts_mut(buf.as_mut_ptr().add(len0), buf.capacity() - len0);
let len = lexical_core::write(n, slice).len();
buf.set_len(len0 + len);
}
}
42 changes: 13 additions & 29 deletions common/datavalues/src/types/serializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ pub trait TypeSerializer: Send + Sync {
))
}

fn write_csv_field<'a>(
&self,
_column: &ColumnRef,
_row_num: usize,
_buf: &mut Vec<u8>,
_format: &FormatSettings,
) -> Result<()> {
unimplemented!()
}

fn serialize_csv<'a>(
&self,
column: &'a ColumnRef,
Expand All @@ -99,9 +109,9 @@ pub trait TypeSerializer: Send + Sync {

fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
nullable: NullInfo<F2>,
_column: &'a ColumnRef,
_format: &FormatSettings,
_nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
F2: Fn(usize) -> bool + 'a,
Expand Down Expand Up @@ -135,29 +145,3 @@ pub enum TypeSerializerImpl {
Struct(StructSerializer),
Variant(VariantSerializer),
}

#[test]
fn test_s() -> Result<()> {
use crate::TypeSerializer;
let col = Series::from_data(vec![true, false, true]);
let col = Series::from_data(vec!["a", "a", "bc"]);
let col = Series::from_data(vec![12, 23, 34]);

println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());

let col = Series::from_data(vec![Some(12), None, Some(34)]);
println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
Ok(())
}
6 changes: 3 additions & 3 deletions common/datavalues/src/types/serializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ impl TypeSerializer for NullableSerializer {

fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
nullable: NullInfo<F2>,
_column: &'a ColumnRef,
_format: &FormatSettings,
_nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
F2: Fn(usize) -> bool + 'a,
Expand Down
16 changes: 15 additions & 1 deletion common/datavalues/src/types/serializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use opensrv_clickhouse::types::HasSqlType;
use serde_json::Value;
use streaming_iterator::StreamingIterator;

use crate::formats::lexical_to_bytes_mut_no_clear;
use crate::prelude::*;
use crate::serializations::formats::iterators::new_it;
use crate::serializations::formats::iterators::NullInfo;
Expand Down Expand Up @@ -116,7 +117,7 @@ where T: PrimitiveType
fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
_format: &FormatSettings,
nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
Expand All @@ -130,4 +131,17 @@ where T: PrimitiveType
nullable,
))
}

fn write_csv_field(
&self,
column: &ColumnRef,
row_num: usize,
buf: &mut Vec<u8>,
_format: &FormatSettings,
) -> Result<()> {
let col: &<T as Scalar>::ColumnType = unsafe { Series::static_cast(&column) };
let v = col.get_data_owned(row_num);
lexical_to_bytes_mut_no_clear(v, buf);
Ok(())
}
}

0 comments on commit ed464ae

Please sign in to comment.