Skip to content

Commit

Permalink
serializer without tmp alloc.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed May 18, 2022
1 parent 9d1b904 commit 93f8832
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions common/datavalues/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ paste = "1.0.7"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.79"
smallvec = { version = "1.8.0", features = ["write"] }
streaming-iterator = "0.1.5"
csv-core = "0.1.10"

[dev-dependencies]
criterion = "0.3.5"
Expand Down
186 changes: 186 additions & 0 deletions common/datavalues/src/types/serializations/formats/iterators.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
pub use streaming_iterator::StreamingIterator;

pub struct NullInfo<F>
where F: Fn(usize) -> bool
{
pub(crate) is_nullable: bool,
pub(crate) is_null: F,
pub(crate) null_value: Vec<u8>,
}

impl<F> NullInfo<F>
where F: Fn(usize) -> bool
{
pub fn new(is_null: F, null_value: Vec<u8>) -> Self {
Self {
is_nullable: true,
is_null,
null_value,
}
}
pub fn not_nullable(is_null: F) -> Self {
Self {
is_nullable: true,
is_null,
null_value: vec![],
}
}
}

pub struct BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
{
iterator: I,
f: F,
buffer: Vec<u8>,
is_valid: bool,
}

pub fn new_it<'a, I, F, T, F2>(
iterator: I,
f: F,
buffer: Vec<u8>,
nullable: NullInfo<F2>,
) -> Box<dyn StreamingIterator<Item = [u8]> + 'a>
where
I: Iterator<Item = T> + 'a,
F: FnMut(T, &mut Vec<u8>) + 'a,
T: 'a,
F2: Fn(usize) -> bool + 'a,
{
if nullable.is_nullable {
Box::new(NullableBufStreamingIterator::new(
iterator, f, buffer, nullable,
))
} else {
Box::new(BufStreamingIterator::new(iterator, f, buffer))
}
}

impl<I, F, T> BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
{
#[inline]
pub fn new(iterator: I, f: F, buffer: Vec<u8>) -> Self {
Self {
iterator,
f,
buffer,
is_valid: false,
}
}
}

impl<I, F, T> StreamingIterator for BufStreamingIterator<I, F, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
{
type Item = [u8];

#[inline]
fn advance(&mut self) {
let a = self.iterator.next();
if let Some(a) = a {
self.is_valid = true;
self.buffer.clear();
(self.f)(a, &mut self.buffer);
} else {
self.is_valid = false;
}
}

#[inline]
fn get(&self) -> Option<&Self::Item> {
if self.is_valid {
Some(&self.buffer)
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.iterator.size_hint()
}
}

pub struct NullableBufStreamingIterator<I, F, F2, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
F2: Fn(usize) -> bool,
{
iterator: I,
f: F,
buffer: Vec<u8>,
is_valid: bool,
cursor: usize,
//is_null: F2,
//null_value: Vec<u8>
null: NullInfo<F2>,
}

impl<I, F, F2, T> NullableBufStreamingIterator<I, F, F2, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
F2: Fn(usize) -> bool,
{
pub fn new(iterator: I, f: F, buffer: Vec<u8>, null: NullInfo<F2>) -> Self {
//pub fn new(iterator: I, f: F, buffer: Vec<u8>, is_null: F2, null_value: Vec<u8>) -> Self {
Self {
iterator,
f,
buffer,
is_valid: false,
null,
// is_null, null_value,
cursor: 0,
}
}
}

impl<I, F, F2, T> StreamingIterator for NullableBufStreamingIterator<I, F, F2, T>
where
I: Iterator<Item = T>,
F: FnMut(T, &mut Vec<u8>),
F2: Fn(usize) -> bool,
{
type Item = [u8];

#[inline]
fn advance(&mut self) {
let a = self.iterator.next();
if let Some(a) = a {
self.is_valid = true;
self.buffer.clear();
if (self.null.is_null)(self.cursor) {
self.buffer.extend_from_slice(&self.null.null_value)
} else {
(self.f)(a, &mut self.buffer);
}
} else {
self.is_valid = false;
}
self.cursor += 1;
}

#[inline]
fn get(&self) -> Option<&Self::Item> {
if self.is_valid {
Some(&self.buffer)
} else {
None
}
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
self.iterator.size_hint()
}
}
1 change: 1 addition & 0 deletions common/datavalues/src/types/serializations/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod iterators;
51 changes: 51 additions & 0 deletions common/datavalues/src/types/serializations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use common_io::prelude::FormatSettings;
use enum_dispatch::enum_dispatch;
use opensrv_clickhouse::types::column::ArcColumnData;
use serde_json::Value;
use streaming_iterator::StreamingIterator;

use crate::prelude::*;
mod array;
mod boolean;
mod date;
mod formats;
mod null;
mod nullable;
mod number;
Expand All @@ -43,6 +45,8 @@ pub use struct_::*;
pub use timestamp::*;
pub use variant::*;

use crate::serializations::formats::iterators::NullInfo;

#[enum_dispatch]
pub trait TypeSerializer: Send + Sync {
fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result<String>;
Expand Down Expand Up @@ -83,6 +87,27 @@ pub trait TypeSerializer: Send + Sync {
"Error parsing JSON: unsupported data type",
))
}

fn serialize_csv<'a>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let is_null = |_| false;
self.serialize_csv_inner(column, format, NullInfo::not_nullable(is_null))
}

fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
F2: Fn(usize) -> bool + 'a,
{
Err(ErrorCode::UnImplement(""))
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -110,3 +135,29 @@ pub enum TypeSerializerImpl {
Struct(StructSerializer),
Variant(VariantSerializer),
}

#[test]
fn test_s() -> Result<()> {
use crate::TypeSerializer;
let col = Series::from_data(vec![true, false, true]);
let col = Series::from_data(vec!["a", "a", "bc"]);
let col = Series::from_data(vec![12, 23, 34]);

println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());

let col = Series::from_data(vec![Some(12), None, Some(34)]);
println!("{:?}", col);
let s = col.data_type().create_serializer();
let mut stream = s.serialize_csv(&col, &FormatSettings::default())?;
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
println!("{:?}", stream.next());
Ok(())
}
27 changes: 27 additions & 0 deletions common/datavalues/src/types/serializations/nullable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ use common_exception::Result;
use common_io::prelude::FormatSettings;
use opensrv_clickhouse::types::column::NullableColumnData;
use serde_json::Value;
use streaming_iterator::StreamingIterator;

use crate::prelude::DataValue;
use crate::serializations::formats::iterators::NullInfo;
use crate::Column;
use crate::ColumnRef;
use crate::NullableColumn;
Expand Down Expand Up @@ -98,4 +100,29 @@ impl TypeSerializer for NullableSerializer {

Ok(Arc::new(data))
}

fn serialize_csv<'a>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>> {
let column2: &NullableColumn = Series::check_get(&column)?;
let null_helper = NullInfo::new(|row| column.null_at(row), vec![b'\0']);
let it = self
.inner
.serialize_csv_inner(column2.inner(), format, null_helper);
it
}

fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
F2: Fn(usize) -> bool + 'a,
{
unreachable!()
}
}
23 changes: 23 additions & 0 deletions common/datavalues/src/types/serializations/number.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::marker::PhantomData;

use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::util::lexical_to_bytes_mut;
use common_exception::Result;
use common_io::prelude::FormatSettings;
use common_io::prelude::Marshal;
Expand All @@ -23,8 +24,11 @@ use opensrv_clickhouse::types::column::ArcColumnWrapper;
use opensrv_clickhouse::types::column::ColumnFrom;
use opensrv_clickhouse::types::HasSqlType;
use serde_json::Value;
use streaming_iterator::StreamingIterator;

use crate::prelude::*;
use crate::serializations::formats::iterators::new_it;
use crate::serializations::formats::iterators::NullInfo;

#[derive(Debug, Clone)]
pub struct NumberSerializer<T: PrimitiveType> {
Expand All @@ -49,6 +53,7 @@ where T: PrimitiveType
+ std::convert::From<opensrv_clickhouse::types::Value>
+ opensrv_clickhouse::io::Marshal
+ opensrv_clickhouse::io::Unmarshal<T>
+ lexical_core::ToLexical
{
fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result<String> {
Ok(format!("{:?}", value))
Expand Down Expand Up @@ -107,4 +112,22 @@ where T: PrimitiveType
.collect();
Ok(result)
}

fn serialize_csv_inner<'a, F2>(
&self,
column: &'a ColumnRef,
format: &FormatSettings,
nullable: NullInfo<F2>,
) -> Result<Box<dyn StreamingIterator<Item = [u8]> + 'a>>
where
F2: Fn(usize) -> bool + 'a,
{
let column2: &PrimitiveColumn<T> = Series::check_get(&column)?;
Ok(new_it(
column2.iter(),
|x, buf| lexical_to_bytes_mut(*x, buf),
vec![],
nullable,
))
}
}

0 comments on commit 93f8832

Please sign in to comment.