diff --git a/Cargo.lock b/Cargo.lock index 4a024fc9517bc..f5301704478d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -942,6 +942,7 @@ dependencies = [ "common-io", "common-macros", "criterion", + "csv-core", "dyn-clone", "enum_dispatch", "itertools", @@ -956,6 +957,7 @@ dependencies = [ "serde", "serde_json", "smallvec", + "streaming-iterator", ] [[package]] diff --git a/common/datavalues/Cargo.toml b/common/datavalues/Cargo.toml index d11750e0ecce5..64adb1896d695 100644 --- a/common/datavalues/Cargo.toml +++ b/common/datavalues/Cargo.toml @@ -35,6 +35,8 @@ paste = "1.0.7" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" smallvec = { version = "1.8.0", features = ["write"] } +streaming-iterator = "0.1.5" +csv-core = "0.1.10" [dev-dependencies] criterion = "0.3.5" @@ -56,3 +58,7 @@ harness = false [[bench]] name = "data_type" harness = false + +[[bench]] +name = "csv" +harness = false diff --git a/common/datavalues/benches/csv.rs b/common/datavalues/benches/csv.rs new file mode 100644 index 0000000000000..d406646c6ed18 --- /dev/null +++ b/common/datavalues/benches/csv.rs @@ -0,0 +1,130 @@ +extern crate core; + +use common_datavalues::serializations::formats::csv; +use common_datavalues::ColumnRef; +use common_datavalues::Series; +use common_datavalues::SeriesFrom; +use criterion::criterion_group; +use criterion::criterion_main; +use criterion::BenchmarkId; +use criterion::Criterion; +use rand::distributions::Alphanumeric; +use rand::rngs::StdRng; +use rand::Rng; +use rand::SeedableRng; + +type ColumnCreator = fn(usize, Option, usize) -> ColumnRef; + +fn bench_name(typ: &str, null_name: &str, f_name: &str) -> String { + format!("{}({})_{}", typ, null_name, f_name) +} + +fn add_group(c: &mut Criterion, typ: &str, creator: ColumnCreator, item_size: usize) { + let mut group = c.benchmark_group(typ); + //let range = (10..=14); + let range = (10..=21); + range.step_by(2).for_each(|log2_size| { + let size = 2usize.pow(log2_size); + for null_density in [None, Some(0.1)] { + let null_name = match null_density { + None => "not_nullable".to_string(), + Some(f) => format!("null={}", f), + }; + + let col = creator(size, null_density, item_size); + group.bench_with_input( + BenchmarkId::new(bench_name(typ, &null_name, "index"), size), + &log2_size, + |b, _| { + b.iter(|| csv::write_by_row(&col)); + }, + ); + // group.bench_with_input( + // BenchmarkId::new(bench_name(typ, &null_name, "iter"), size), + // &log2_size, + // |b, _| { + // b.iter(|| csv::write_iterator(&col)); + // }, + // ); + group.bench_with_input( + BenchmarkId::new(bench_name(typ, &null_name, "embedded"), size), + &log2_size, + |b, _| { + b.iter(|| csv::write_embedded(&col)); + }, + ); + } + }); +} + +fn add_benchmark(c: &mut Criterion) { + add_group(c, "i32", create_primitive_array, 1); + for strlen in [10, 100] { + let typ = format!("str[{}]", strlen); + add_group(c, &typ, create_string_array, strlen); + } +} + +pub fn create_primitive_array( + size: usize, + null_density: Option, + _item_size: usize, +) -> ColumnRef { + let mut rng = StdRng::seed_from_u64(3); + match null_density { + None => { + let v = (0..size).map(|_| rng.gen()).collect::>(); + Series::from_data(v) + } + Some(null_density) => { + let v = (0..size) + .map(|_| { + if rng.gen::() < null_density { + None + } else { + Some(rng.gen()) + } + }) + .collect::>>(); + Series::from_data(v) + } + } +} + +#[allow(dead_code)] +pub fn create_string_array(size: usize, null_density: Option, item_size: usize) -> ColumnRef { + let mut rng = StdRng::seed_from_u64(3); + match null_density { + None => { + let vec: Vec = (0..item_size) + .map(|_| { + (&mut rng) + .sample_iter(&Alphanumeric) + .take(size) + .map(char::from) + .collect::() + }) + .collect(); + Series::from_data(vec) + } + Some(null_density) => { + let vec: Vec<_> = (0..item_size) + .map(|_| { + if rng.gen::() < null_density { + None + } else { + let value = (&mut rng) + .sample_iter(&Alphanumeric) + .take(size) + .collect::>(); + Some(value) + } + }) + .collect(); + Series::from_data(vec) + } + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/common/datavalues/src/columns/array/mod.rs b/common/datavalues/src/columns/array/mod.rs index 1ea53284a0304..b922a77526999 100644 --- a/common/datavalues/src/columns/array/mod.rs +++ b/common/datavalues/src/columns/array/mod.rs @@ -179,6 +179,11 @@ impl ScalarColumn for ArrayColumn { ArrayValueRef::Indexed { column: self, idx } } + #[inline] + fn get_data_owned(&self, idx: usize) -> Self::OwnedItem { + self.get(idx).into() + } + fn scalar_iter(&self) -> Self::Iterator<'_> { ArrayValueIter::new(self) } diff --git a/common/datavalues/src/columns/primitive/mod.rs b/common/datavalues/src/columns/primitive/mod.rs index 62a00dff2d5ba..3e1145ee197b9 100644 --- a/common/datavalues/src/columns/primitive/mod.rs +++ b/common/datavalues/src/columns/primitive/mod.rs @@ -336,6 +336,9 @@ where fn get_data(&self, idx: usize) -> Self::RefItem<'_> { self.values[idx] } + fn get_data_owned(&self, idx: usize) -> Self::OwnedItem { + self.values[idx].clone() + } fn scalar_iter(&self) -> Self::Iterator<'_> { self.iter().copied() diff --git a/common/datavalues/src/scalars/column.rs b/common/datavalues/src/scalars/column.rs index ff6570955cb39..a15de64f87389 100644 --- a/common/datavalues/src/scalars/column.rs +++ b/common/datavalues/src/scalars/column.rs @@ -34,6 +34,9 @@ where for<'a> Self::OwnedItem: Scalar = Self::RefItem<'a>> // Note: get_data has bad performance, avoid call this function inside the loop // Use `iter` instead fn get_data(&self, idx: usize) -> Self::RefItem<'_>; + fn get_data_owned(&self, _idx: usize) -> Self::OwnedItem { + unimplemented!() + } /// Get iterator of this column. fn scalar_iter(&self) -> Self::Iterator<'_>; diff --git a/common/datavalues/src/types/serializations/formats/csv.rs b/common/datavalues/src/types/serializations/formats/csv.rs new file mode 100644 index 0000000000000..3357e83b13208 --- /dev/null +++ b/common/datavalues/src/types/serializations/formats/csv.rs @@ -0,0 +1,113 @@ +use common_exception::Result; +use common_io::prelude::FormatSettings; + +use crate::ColumnRef; +use crate::DataType; +use crate::TypeSerializer; + +#[allow(dead_code)] +pub fn write_vec(col: &ColumnRef) -> Vec { + let mut buf = Vec::with_capacity(1000 * 1000); + + let s = col.data_type().create_serializer(); + let v = s + .serialize_column(&col, &FormatSettings::default()) + .unwrap(); + for field in v { + buf.extend_from_slice(&field.as_bytes()); + } + buf +} + +pub fn write_by_row(col: &ColumnRef) -> Vec { + let mut buf = Vec::with_capacity(1000 * 1000); + let rows = col.len(); + let s = col.data_type().create_serializer(); + let f = &FormatSettings::default(); + for row in 0..rows { + s.write_csv_field(col, row, &mut buf, f).unwrap(); + } + buf +} + +pub fn write_iterator(col: &ColumnRef) -> Vec { + let mut buf = Vec::with_capacity(1000 * 1000); + + let s = col.data_type().create_serializer(); + let mut stream = s.serialize_csv(&col, &FormatSettings::default()).unwrap(); + while let Some(field) = stream.next() { + buf.extend_from_slice(field); + } + buf +} + +pub fn write_embedded(col: &ColumnRef) -> Vec { + let mut buf = Vec::with_capacity(1000 * 1000); + let s = col.data_type().create_serializer(); + let rows = col.len(); + let ss = s.get_csv_serializer(col).unwrap(); + let f = &FormatSettings::default(); + for row in 0..rows { + ss.write_csv_field(row, &mut buf, f).unwrap(); + } + buf +} + +#[test] +fn test_writers() -> Result<()> { + use crate::Series; + use crate::SeriesFrom; + let col = Series::from_data(vec![12u8, 23u8, 34u8]); + let exp = [49, 50, 50, 51, 51, 52]; + assert_eq!(write_iterator(&col), exp); + assert_eq!(write_by_row(&col), exp); + assert_eq!(write_embedded(&col), exp); + + let col = Series::from_data(vec![Some(12u8), None, Some(34u8)]); + let exp = [49, 50, 0, 51, 52]; + assert_eq!(write_iterator(&col), exp); + assert_eq!(write_by_row(&col), exp); + assert_eq!(write_embedded(&col), exp); + + let col = Series::from_data(vec!["12", "34"]); + let exp = "1234".to_string().as_bytes().to_vec(); + assert_eq!(write_iterator(&col), exp); + assert_eq!(write_by_row(&col), exp); + assert_eq!(write_embedded(&col), exp); + + let col = Series::from_data(vec![Some(12u8), None, Some(34u8)]); + let exp = [49, 50, 0, 51, 52]; + assert_eq!(write_iterator(&col), exp); + assert_eq!(write_by_row(&col), exp); + assert_eq!(write_embedded(&col), exp); + Ok(()) +} + +#[test] +fn test_debug() -> Result<()> { + use crate::Series; + use crate::SeriesFrom; + use crate::TypeSerializer; + // let col = Series::from_data(vec![true, false, true]); + // let col = Series::from_data(vec!["a", "a", "bc"]); + // let col = Series::from_data(vec![12, 23, 34]); + let col = Series::from_data(vec![12u8, 23u8, 34u8]); + + println!("{:?}", col); + let s = col.data_type().create_serializer(); + let mut stream = s.serialize_csv(&col, &FormatSettings::default())?; + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + + let col = Series::from_data(vec![Some(12), None, Some(34)]); + println!("{:?}", col); + let s = col.data_type().create_serializer(); + let mut stream = s.serialize_csv(&col, &FormatSettings::default())?; + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + println!("{:?}", stream.next()); + Ok(()) +} diff --git a/common/datavalues/src/types/serializations/formats/iterators.rs b/common/datavalues/src/types/serializations/formats/iterators.rs new file mode 100644 index 0000000000000..8610bc9be128e --- /dev/null +++ b/common/datavalues/src/types/serializations/formats/iterators.rs @@ -0,0 +1,186 @@ +pub use streaming_iterator::StreamingIterator; + +pub struct NullInfo +where F: Fn(usize) -> bool +{ + pub(crate) is_nullable: bool, + pub(crate) is_null: F, + pub(crate) null_value: Vec, +} + +impl NullInfo +where F: Fn(usize) -> bool +{ + pub fn new(is_null: F, null_value: Vec) -> Self { + Self { + is_nullable: true, + is_null, + null_value, + } + } + pub fn not_nullable(is_null: F) -> Self { + Self { + is_nullable: true, + is_null, + null_value: vec![], + } + } +} + +pub struct BufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), +{ + iterator: I, + f: F, + buffer: Vec, + is_valid: bool, +} + +pub fn new_it<'a, I, F, T, F2>( + iterator: I, + f: F, + buffer: Vec, + nullable: NullInfo, +) -> Box + 'a> +where + I: Iterator + 'a, + F: FnMut(T, &mut Vec) + 'a, + T: 'a, + F2: Fn(usize) -> bool + 'a, +{ + if nullable.is_nullable { + Box::new(NullableBufStreamingIterator::new( + iterator, f, buffer, nullable, + )) + } else { + Box::new(BufStreamingIterator::new(iterator, f, buffer)) + } +} + +impl BufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), +{ + #[inline] + pub fn new(iterator: I, f: F, buffer: Vec) -> Self { + Self { + iterator, + f, + buffer, + is_valid: false, + } + } +} + +impl StreamingIterator for BufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), +{ + type Item = [u8]; + + #[inline] + fn advance(&mut self) { + let a = self.iterator.next(); + if let Some(a) = a { + self.is_valid = true; + self.buffer.clear(); + (self.f)(a, &mut self.buffer); + } else { + self.is_valid = false; + } + } + + #[inline] + fn get(&self) -> Option<&Self::Item> { + if self.is_valid { + Some(&self.buffer) + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iterator.size_hint() + } +} + +pub struct NullableBufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), + F2: Fn(usize) -> bool, +{ + iterator: I, + f: F, + buffer: Vec, + is_valid: bool, + cursor: usize, + //is_null: F2, + //null_value: Vec + null: NullInfo, +} + +impl NullableBufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), + F2: Fn(usize) -> bool, +{ + pub fn new(iterator: I, f: F, buffer: Vec, null: NullInfo) -> Self { + //pub fn new(iterator: I, f: F, buffer: Vec, is_null: F2, null_value: Vec) -> Self { + Self { + iterator, + f, + buffer, + is_valid: false, + null, + // is_null, null_value, + cursor: 0, + } + } +} + +impl StreamingIterator for NullableBufStreamingIterator +where + I: Iterator, + F: FnMut(T, &mut Vec), + F2: Fn(usize) -> bool, +{ + type Item = [u8]; + + #[inline] + fn advance(&mut self) { + let a = self.iterator.next(); + if let Some(a) = a { + self.is_valid = true; + self.buffer.clear(); + if (self.null.is_null)(self.cursor) { + self.buffer.extend_from_slice(&self.null.null_value) + } else { + (self.f)(a, &mut self.buffer); + } + } else { + self.is_valid = false; + } + self.cursor += 1; + } + + #[inline] + fn get(&self) -> Option<&Self::Item> { + if self.is_valid { + Some(&self.buffer) + } else { + None + } + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.iterator.size_hint() + } +} diff --git a/common/datavalues/src/types/serializations/formats/mod.rs b/common/datavalues/src/types/serializations/formats/mod.rs new file mode 100644 index 0000000000000..fb2778ea9fb86 --- /dev/null +++ b/common/datavalues/src/types/serializations/formats/mod.rs @@ -0,0 +1,20 @@ +pub mod csv; +pub mod iterators; + +#[inline] +pub fn lexical_to_bytes_mut_no_clear(n: N, buf: &mut Vec) { + buf.reserve(N::FORMATTED_SIZE_DECIMAL); + let len0 = buf.len(); + unsafe { + // JUSTIFICATION + // Benefit + // Allows using the faster serializer lexical core and convert to string + // Soundness + // Length of buf is set as written length afterwards. lexical_core + // creates a valid string, so doesn't need to be checked. + let slice = + std::slice::from_raw_parts_mut(buf.as_mut_ptr().add(len0), buf.capacity() - len0); + let len = lexical_core::write(n, slice).len(); + buf.set_len(len0 + len); + } +} diff --git a/common/datavalues/src/types/serializations/mod.rs b/common/datavalues/src/types/serializations/mod.rs index 918c2e0c3a09f..849e4826c1e0e 100644 --- a/common/datavalues/src/types/serializations/mod.rs +++ b/common/datavalues/src/types/serializations/mod.rs @@ -19,11 +19,13 @@ use common_io::prelude::FormatSettings; use enum_dispatch::enum_dispatch; use opensrv_clickhouse::types::column::ArcColumnData; use serde_json::Value; +use streaming_iterator::StreamingIterator; use crate::prelude::*; mod array; mod boolean; mod date; +pub mod formats; mod null; mod nullable; mod number; @@ -43,6 +45,17 @@ pub use struct_::*; pub use timestamp::*; pub use variant::*; +use crate::serializations::formats::iterators::NullInfo; + +pub trait ColSerializer: Send + Sync { + fn write_csv_field( + &self, + row_num: usize, + buf: &mut Vec, + format: &FormatSettings, + ) -> Result<()>; +} + #[enum_dispatch] pub trait TypeSerializer: Send + Sync { fn serialize_value(&self, value: &DataValue, format: &FormatSettings) -> Result; @@ -83,6 +96,54 @@ pub trait TypeSerializer: Send + Sync { "Error parsing JSON: unsupported data type", )) } + + fn write_csv_field<'a>( + &self, + column: &ColumnRef, + row_num: usize, + buf: &mut Vec, + format: &FormatSettings, + ) -> Result<()> { + self.write_csv_field_not_null(column, row_num, buf, format) + } + + fn write_csv_field_not_null<'a>( + &self, + _column: &ColumnRef, + _row_num: usize, + _buf: &mut Vec, + _format: &FormatSettings, + ) -> Result<()> { + unimplemented!() + } + + fn serialize_csv<'a>( + &self, + column: &'a ColumnRef, + format: &FormatSettings, + ) -> Result + 'a>> { + let is_null = |_| false; + self.serialize_csv_inner(column, format, NullInfo::not_nullable(is_null)) + } + + fn serialize_csv_inner<'a, F2>( + &self, + _column: &'a ColumnRef, + _format: &FormatSettings, + _nullable: NullInfo, + ) -> Result + 'a>> + where + F2: Fn(usize) -> bool + 'a, + { + Err(ErrorCode::UnImplement("")) + } + + fn get_csv_serializer<'a>( + &self, + _column: &'a ColumnRef, + ) -> Result> { + Err(ErrorCode::UnImplement("")) + } } #[derive(Debug, Clone)] diff --git a/common/datavalues/src/types/serializations/nullable.rs b/common/datavalues/src/types/serializations/nullable.rs index 6f95f04d43538..8e25cc6611a19 100644 --- a/common/datavalues/src/types/serializations/nullable.rs +++ b/common/datavalues/src/types/serializations/nullable.rs @@ -14,12 +14,16 @@ use std::sync::Arc; +use common_arrow::arrow::bitmap::Bitmap; use common_exception::Result; use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::NullableColumnData; use serde_json::Value; +use streaming_iterator::StreamingIterator; use crate::prelude::DataValue; +use crate::serializations::formats::iterators::NullInfo; +use crate::ColSerializer; use crate::Column; use crate::ColumnRef; use crate::NullableColumn; @@ -98,4 +102,79 @@ impl TypeSerializer for NullableSerializer { Ok(Arc::new(data)) } + + fn serialize_csv<'a>( + &self, + column: &'a ColumnRef, + format: &FormatSettings, + ) -> Result + 'a>> { + let column2: &NullableColumn = Series::check_get(&column)?; + let null_helper = NullInfo::new(|row| column.null_at(row), vec![b'\0']); + let it = self + .inner + .serialize_csv_inner(column2.inner(), format, null_helper); + it + } + + fn write_csv_field<'a>( + &self, + column: &ColumnRef, + row_num: usize, + buf: &mut Vec, + format: &FormatSettings, + ) -> Result<()> { + let column: &NullableColumn = Series::check_get(&column).unwrap(); + if !column.null_at(row_num) { + self.inner + .write_csv_field_not_null(column.inner(), row_num, buf, format) + .unwrap(); + } else { + buf.extend_from_slice(&format.csv_null); + } + Ok(()) + } + + fn serialize_csv_inner<'a, F2>( + &self, + _column: &'a ColumnRef, + _format: &FormatSettings, + _nullable: NullInfo, + ) -> Result + 'a>> + where + F2: Fn(usize) -> bool + 'a, + { + unreachable!() + } + + fn get_csv_serializer<'a>(&self, column: &'a ColumnRef) -> Result> { + let column: &NullableColumn = Series::check_get(&column)?; + + let cs = NullableColSerializer { + validity: column.ensure_validity(), + inner: self.inner.get_csv_serializer(column.inner())?, + }; + Ok(Box::new(cs)) + } +} + +//#[derive(Clone)] +pub struct NullableColSerializer<'a> { + validity: &'a Bitmap, + inner: Box, +} + +impl<'a> ColSerializer for NullableColSerializer<'a> { + fn write_csv_field( + &self, + row_num: usize, + buf: &mut Vec, + format: &FormatSettings, + ) -> Result<()> { + if self.validity.get_bit(row_num) { + self.inner.write_csv_field(row_num, buf, format).unwrap(); + } else { + buf.extend_from_slice(&format.csv_null); + } + Ok(()) + } } diff --git a/common/datavalues/src/types/serializations/number.rs b/common/datavalues/src/types/serializations/number.rs index 965a7d63ea957..2db96980dad9c 100644 --- a/common/datavalues/src/types/serializations/number.rs +++ b/common/datavalues/src/types/serializations/number.rs @@ -15,6 +15,7 @@ use std::marker::PhantomData; use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::util::lexical_to_bytes_mut; use common_exception::Result; use common_io::prelude::FormatSettings; use common_io::prelude::Marshal; @@ -23,8 +24,12 @@ use opensrv_clickhouse::types::column::ArcColumnWrapper; use opensrv_clickhouse::types::column::ColumnFrom; use opensrv_clickhouse::types::HasSqlType; use serde_json::Value; +use streaming_iterator::StreamingIterator; +use crate::formats::lexical_to_bytes_mut_no_clear; use crate::prelude::*; +use crate::serializations::formats::iterators::new_it; +use crate::serializations::formats::iterators::NullInfo; #[derive(Debug, Clone)] pub struct NumberSerializer { @@ -49,6 +54,7 @@ where T: PrimitiveType + std::convert::From + opensrv_clickhouse::io::Marshal + opensrv_clickhouse::io::Unmarshal + + lexical_core::ToLexical { fn serialize_value(&self, value: &DataValue, _format: &FormatSettings) -> Result { Ok(format!("{:?}", value)) @@ -107,4 +113,64 @@ where T: PrimitiveType .collect(); Ok(result) } + + fn serialize_csv_inner<'a, F2>( + &self, + column: &'a ColumnRef, + _format: &FormatSettings, + nullable: NullInfo, + ) -> Result + 'a>> + where + F2: Fn(usize) -> bool + 'a, + { + let column2: &PrimitiveColumn = Series::check_get(&column)?; + Ok(new_it( + column2.iter(), + |x, buf| lexical_to_bytes_mut(*x, buf), + vec![], + nullable, + )) + } + + fn write_csv_field_not_null( + &self, + column: &ColumnRef, + row_num: usize, + buf: &mut Vec, + _format: &FormatSettings, + ) -> Result<()> { + //let col: &::ColumnType = unsafe { Series::static_cast(&column) }; + //let v = col.get_data_owned(row_num); + let col: &PrimitiveColumn = Series::check_get(&column).unwrap(); + let v = unsafe { col.value_unchecked(row_num) }; + lexical_to_bytes_mut_no_clear(v, buf); + Ok(()) + } + + fn get_csv_serializer<'a>(&self, column: &'a ColumnRef) -> Result> { + let column2: &PrimitiveColumn = Series::check_get(&column)?; + let s = NumberColSerialize { + values: column2.values(), + }; + Ok(Box::new(s)) + } +} + +#[derive(Clone)] +pub struct NumberColSerialize<'a, T: PrimitiveType> { + pub(crate) values: &'a [T], +} + +impl<'a, T> ColSerializer for NumberColSerialize<'a, T> +where T: PrimitiveType + lexical_core::ToLexical +{ + fn write_csv_field( + &self, + row_num: usize, + buf: &mut Vec, + _format: &FormatSettings, + ) -> Result<()> { + lexical_to_bytes_mut_no_clear(self.values[row_num].clone(), buf); + Ok(()) + } } diff --git a/common/datavalues/src/types/serializations/string.rs b/common/datavalues/src/types/serializations/string.rs index 466f3c1fbc225..515022915c1dc 100644 --- a/common/datavalues/src/types/serializations/string.rs +++ b/common/datavalues/src/types/serializations/string.rs @@ -19,7 +19,10 @@ use common_io::prelude::FormatSettings; use opensrv_clickhouse::types::column::ArcColumnWrapper; use opensrv_clickhouse::types::column::ColumnFrom; use serde_json::Value; +use streaming_iterator::StreamingIterator; +use crate::formats::iterators::new_it; +use crate::formats::iterators::NullInfo; use crate::prelude::*; #[derive(Debug, Clone)] @@ -133,4 +136,58 @@ impl TypeSerializer for StringSerializer { .collect(); Ok(result) } + + fn serialize_csv_inner<'a, F2>( + &self, + column: &'a ColumnRef, + _format: &FormatSettings, + nullable: NullInfo, + ) -> Result + 'a>> + where + F2: Fn(usize) -> bool + 'a, + { + let column2: &StringColumn = Series::check_get(&column)?; + Ok(new_it( + column2.iter(), + |x, buf| buf.extend_from_slice(x), + vec![], + nullable, + )) + } + + fn write_csv_field_not_null<'a>( + &self, + column: &ColumnRef, + row_num: usize, + buf: &mut Vec, + _format: &FormatSettings, + ) -> Result<()> { + let col: & as Scalar>::ColumnType = unsafe { Series::static_cast(&column) }; + buf.extend_from_slice(col.get_data(row_num)); + Ok(()) + } + + // move it to DataType later + fn get_csv_serializer<'a>(&self, column: &'a ColumnRef) -> Result> { + let col: &StringColumn = Series::check_get(&column)?; + let s = StringColSerializer { col }; + Ok(Box::new(s)) + } +} + +#[derive(Clone)] +pub struct StringColSerializer<'a> { + pub(crate) col: &'a StringColumn, +} + +impl<'a> ColSerializer for StringColSerializer<'a> { + fn write_csv_field( + &self, + row_num: usize, + buf: &mut Vec, + _format: &FormatSettings, + ) -> Result<()> { + buf.extend_from_slice(unsafe { self.col.value_unchecked(row_num) }); + Ok(()) + } } diff --git a/common/io/src/format_settings.rs b/common/io/src/format_settings.rs index 48bfbcae69c7a..067c77a684d86 100644 --- a/common/io/src/format_settings.rs +++ b/common/io/src/format_settings.rs @@ -28,6 +28,7 @@ pub struct FormatSettings { pub skip_header: bool, pub compression: Compression, pub timezone: Tz, + pub csv_null: Vec, } impl Default for FormatSettings { @@ -39,6 +40,7 @@ impl Default for FormatSettings { skip_header: false, compression: Compression::None, timezone: "UTC".parse::().unwrap(), + csv_null: vec![b'\0'], // for test } } }