Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

serializer without tmp alloc. #5451

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions common/datavalues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ paste = "1.0.7"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
smallvec = { version = "1.8.0", features = ["write"] }
streaming-iterator = "0.1.5"
csv-core = "0.1.10"

[dev-dependencies]
criterion = "0.3.5"
Expand All @@ -56,3 +58,7 @@ harness = false
[[bench]]
name = "data_type"
harness = false

[[bench]]
name = "csv"
harness = false
130 changes: 130 additions & 0 deletions common/datavalues/benches/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
extern crate core;

use common_datavalues::serializations::formats::csv;
use common_datavalues::ColumnRef;
use common_datavalues::Series;
use common_datavalues::SeriesFrom;
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::BenchmarkId;
use criterion::Criterion;
use rand::distributions::Alphanumeric;
use rand::rngs::StdRng;
use rand::Rng;
use rand::SeedableRng;

type ColumnCreator = fn(usize, Option<f32>, usize) -> ColumnRef;

fn bench_name(typ: &str, null_name: &str, f_name: &str) -> String {
format!("{}({})_{}", typ, null_name, f_name)
}

fn add_group(c: &mut Criterion, typ: &str, creator: ColumnCreator, item_size: usize) {
let mut group = c.benchmark_group(typ);
//let range = (10..=14);
let range = (10..=21);
range.step_by(2).for_each(|log2_size| {
let size = 2usize.pow(log2_size);
for null_density in [None, Some(0.1)] {
let null_name = match null_density {
None => "not_nullable".to_string(),
Some(f) => format!("null={}", f),
};

let col = creator(size, null_density, item_size);
group.bench_with_input(
BenchmarkId::new(bench_name(typ, &null_name, "index"), size),
&log2_size,
|b, _| {
b.iter(|| csv::write_by_row(&col));
},
);
// group.bench_with_input(
// BenchmarkId::new(bench_name(typ, &null_name, "iter"), size),
// &log2_size,
// |b, _| {
// b.iter(|| csv::write_iterator(&col));
// },
// );
group.bench_with_input(
BenchmarkId::new(bench_name(typ, &null_name, "embedded"), size),
&log2_size,
|b, _| {
b.iter(|| csv::write_embedded(&col));
},
);
}
});
}

fn add_benchmark(c: &mut Criterion) {
add_group(c, "i32", create_primitive_array, 1);
for strlen in [10, 100] {
let typ = format!("str[{}]", strlen);
add_group(c, &typ, create_string_array, strlen);
}
}

pub fn create_primitive_array(
size: usize,
null_density: Option<f32>,
_item_size: usize,
) -> ColumnRef {
let mut rng = StdRng::seed_from_u64(3);
match null_density {
None => {
let v = (0..size).map(|_| rng.gen()).collect::<Vec<i32>>();
Series::from_data(v)
}
Some(null_density) => {
let v = (0..size)
.map(|_| {
if rng.gen::<f32>() < null_density {
None
} else {
Some(rng.gen())
}
})
.collect::<Vec<Option<i32>>>();
Series::from_data(v)
}
}
}

#[allow(dead_code)]
pub fn create_string_array(size: usize, null_density: Option<f32>, item_size: usize) -> ColumnRef {
let mut rng = StdRng::seed_from_u64(3);
match null_density {
None => {
let vec: Vec<String> = (0..item_size)
.map(|_| {
(&mut rng)
.sample_iter(&Alphanumeric)
.take(size)
.map(char::from)
.collect::<String>()
})
.collect();
Series::from_data(vec)
}
Some(null_density) => {
let vec: Vec<_> = (0..item_size)
.map(|_| {
if rng.gen::<f32>() < null_density {
None
} else {
let value = (&mut rng)
.sample_iter(&Alphanumeric)
.take(size)
.collect::<Vec<u8>>();
Some(value)
}
})
.collect();
Series::from_data(vec)
}
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
5 changes: 5 additions & 0 deletions common/datavalues/src/columns/array/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@ impl ScalarColumn for ArrayColumn {
ArrayValueRef::Indexed { column: self, idx }
}

#[inline]
fn get_data_owned(&self, idx: usize) -> Self::OwnedItem {
self.get(idx).into()
}

fn scalar_iter(&self) -> Self::Iterator<'_> {
ArrayValueIter::new(self)
}
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/columns/primitive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ where
fn get_data(&self, idx: usize) -> Self::RefItem<'_> {
self.values[idx]
}
fn get_data_owned(&self, idx: usize) -> Self::OwnedItem {
self.values[idx].clone()
}

fn scalar_iter(&self) -> Self::Iterator<'_> {
self.iter().copied()
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/scalars/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ where for<'a> Self::OwnedItem: Scalar<RefType<'a> = Self::RefItem<'a>>
// Note: get_data has bad performance, avoid call this function inside the loop
// Use `iter` instead
fn get_data(&self, idx: usize) -> Self::RefItem<'_>;
fn get_data_owned(&self, _idx: usize) -> Self::OwnedItem {
unimplemented!()
}

/// Get iterator of this column.
fn scalar_iter(&self) -> Self::Iterator<'_>;
Expand Down
113 changes: 113 additions & 0 deletions common/datavalues/src/types/serializations/formats/csv.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use common_exception::Result;
use common_io::prelude::FormatSettings;

use crate::ColumnRef;
use crate::DataType;
use crate::TypeSerializer;

#[allow(dead_code)]
pub fn write_vec(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);

let s = col.data_type().create_serializer();
let v = s
.serialize_column(&col, &FormatSettings::default())
.unwrap();
for field in v {
buf.extend_from_slice(&field.as_bytes());
}
buf
}

pub fn write_by_row(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);
let rows = col.len();
let s = col.data_type().create_serializer();
let f = &FormatSettings::default();
for row in 0..rows {
s.write_csv_field(col, row, &mut buf, f).unwrap();
}
buf
}

pub fn write_iterator(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);

let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default()).unwrap();
while let Some(field) = stream.next() {
buf.extend_from_slice(field);
}
buf
}

pub fn write_embedded(col: &ColumnRef) -> Vec<u8> {
let mut buf = Vec::with_capacity(1000 * 1000);
let s = col.data_type().create_serializer();
let rows = col.len();
let ss = s.get_csv_serializer(col).unwrap();
let f = &FormatSettings::default();
for row in 0..rows {
ss.write_csv_field(row, &mut buf, f).unwrap();
}
buf
}

#[test]
fn test_writers() -> Result<()> {
use crate::Series;
use crate::SeriesFrom;
let col = Series::from_data(vec![12u8, 23u8, 34u8]);
let exp = [49, 50, 50, 51, 51, 52];
assert_eq!(write_iterator(&col), exp);
assert_eq!(write_by_row(&col), exp);
assert_eq!(write_embedded(&col), exp);

let col = Series::from_data(vec![Some(12u8), None, Some(34u8)]);
let exp = [49, 50, 0, 51, 52];
assert_eq!(write_iterator(&col), exp);
assert_eq!(write_by_row(&col), exp);
assert_eq!(write_embedded(&col), exp);

let col = Series::from_data(vec!["12", "34"]);
let exp = "1234".to_string().as_bytes().to_vec();
assert_eq!(write_iterator(&col), exp);
assert_eq!(write_by_row(&col), exp);
assert_eq!(write_embedded(&col), exp);

let col = Series::from_data(vec![Some(12u8), None, Some(34u8)]);
let exp = [49, 50, 0, 51, 52];
assert_eq!(write_iterator(&col), exp);
assert_eq!(write_by_row(&col), exp);
assert_eq!(write_embedded(&col), exp);
Ok(())
}

#[test]
fn test_debug() -> Result<()> {
use crate::Series;
use crate::SeriesFrom;
use crate::TypeSerializer;
// let col = Series::from_data(vec![true, false, true]);
// let col = Series::from_data(vec!["a", "a", "bc"]);
// let col = Series::from_data(vec![12, 23, 34]);
let col = Series::from_data(vec![12u8, 23u8, 34u8]);

println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());

let col = Series::from_data(vec![Some(12), None, Some(34)]);
println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
Ok(())
}
Loading