Skip to content

Commit

Permalink
bump arrow2 to 0.9.1.
Browse files Browse the repository at this point in the history
  • Loading branch information
youngsofun committed Feb 21, 2022
1 parent 8d7e856 commit e953145
Show file tree
Hide file tree
Showing 22 changed files with 132 additions and 109 deletions.
57 changes: 39 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ doctest = false
test = false

[features]
arrow-default = ["arrow/compute", "arrow/regex", "arrow/io_csv", "arrow/io_parquet", "arrow/io_json", "arrow/io_flight"]
arrow-default = ["arrow/compute", "arrow/regex", "arrow/io_csv", "arrow/io_parquet", "arrow/io_json", "arrow/io_flight", "arrow/compute_filter"]
default = ["arrow-default", "parquet-default"]
parquet-default = ["parquet2/stream", "parquet2/lz4"]
simd = ["arrow/simd"]
Expand All @@ -20,9 +20,9 @@ simd = ["arrow/simd"]
# Workspace dependencies

# Github dependencies
arrow = { package = "arrow2", git = "https://github.com/datafuse-extras/arrow2", default-features = false, rev = "d14ae86"}
arrow-format = { version = "0.3.0", features = ["flight-data", "flight-service"] }
parquet2 = { version = "0.8.1", default_features = false }
arrow = { package = "arrow2", version="0.9.1", default-features = false}
arrow-format = { version = "0.4.0", features = ["flight-data", "flight-service", "ipc"] }
parquet2 = { version = "0.9.0", default_features = false }
# Crates.io dependencies

[dev-dependencies]
44 changes: 22 additions & 22 deletions common/datablocks/src/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use std::convert::TryFrom;
use std::fmt;
use std::sync::Arc;

use common_arrow::arrow;
use common_arrow::arrow::record_batch::RecordBatch;
use common_arrow::arrow::array::Array;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::chunk::Chunk;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -174,38 +175,37 @@ impl DataBlock {

Ok(Self { columns, schema })
}
}

impl TryFrom<DataBlock> for RecordBatch {
type Error = ErrorCode;

fn try_from(v: DataBlock) -> Result<RecordBatch> {
let arrays = v
pub fn from_chunk(schema: &DataSchemaRef, chuck: &Chunk<ArrayRef>) -> Result<DataBlock> {
let columns = chuck
.columns()
.iter()
.map(|c| c.as_arrow_array())
.collect::<Vec<_>>();
.zip(schema.fields().iter())
.map(|(col, f)| match f.is_nullable() {
true => col.into_nullable_column(),
false => col.into_column(),
})
.collect();

Ok(RecordBatch::try_new(Arc::new(v.schema.to_arrow()), arrays)?)
Ok(DataBlock::create(schema.clone(), columns))
}
}

impl TryFrom<arrow::record_batch::RecordBatch> for DataBlock {
pub fn box_chunk_to_arc_chunk(c: Chunk<Box<dyn Array>>) -> Chunk<ArrayRef> {
Chunk::<ArrayRef>::new(c.into_arrays().into_iter().map(Arc::from).collect())
}

impl TryFrom<DataBlock> for Chunk<ArrayRef> {
type Error = ErrorCode;

fn try_from(v: arrow::record_batch::RecordBatch) -> Result<DataBlock> {
let schema: DataSchemaRef = Arc::new(v.schema().as_ref().into());
let columns = v
fn try_from(v: DataBlock) -> Result<Chunk<ArrayRef>> {
let arrays = v
.columns()
.iter()
.zip(schema.fields().iter())
.map(|(col, f)| match f.is_nullable() {
true => col.into_nullable_column(),
false => col.into_column(),
})
.collect();
.map(|c| c.as_arrow_array())
.collect::<Vec<_>>();

Ok(DataBlock::create(schema, columns))
Ok(Chunk::try_new(arrays)?)
}
}

Expand Down
1 change: 1 addition & 0 deletions common/datablocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod data_block_debug;
mod kernels;
mod memory;

pub use data_block::box_chunk_to_arc_chunk;
pub use data_block::DataBlock;
pub use data_block_debug::*;
pub use kernels::*;
Expand Down
14 changes: 6 additions & 8 deletions common/datablocks/tests/it/data_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use common_arrow::arrow::record_batch::RecordBatch;
use common_arrow::arrow::array::ArrayRef;
use common_arrow::arrow::chunk::Chunk;
use common_datablocks::DataBlock;
use common_datavalues::prelude::*;
use common_exception::Result;
Expand Down Expand Up @@ -60,18 +61,15 @@ fn test_data_block_convert() -> Result<()> {
assert_eq!(3, block.num_rows());
assert_eq!(4, block.num_columns());

let record_batch: RecordBatch = block.try_into().unwrap();
let chunk: Chunk<ArrayRef> = block.try_into().unwrap();

// first and last test.
assert_eq!(3, record_batch.num_rows());
assert_eq!(4, record_batch.num_columns());
assert_eq!(3, chunk.len());
assert_eq!(4, chunk.columns().len());

let new_block: DataBlock = record_batch.try_into().unwrap();
let new_block: DataBlock = DataBlock::from_chunk(&schema, &chunk).unwrap();
assert_eq!(3, new_block.num_rows());
assert_eq!(4, new_block.num_columns());

let new_schema = new_block.schema();

assert_eq!(new_schema, &schema);
Ok(())
}
1 change: 1 addition & 0 deletions common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl IntoColumn for &ArrayRef {
}
}

//impl<A: AsRef<dyn Array>> IntoColumn for A {
impl IntoColumn for ArrayRef {
fn into_column(self) -> ColumnRef {
use TypeID::*;
Expand Down
2 changes: 1 addition & 1 deletion common/datavalues/src/data_field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl From<&ArrowField> for DataField {
fn from(f: &ArrowField) -> Self {
let dt: DataTypePtr = from_arrow_field(f);

DataField::new(f.name(), dt)
DataField::new(&f.name, dt)
}
}

Expand Down
15 changes: 7 additions & 8 deletions common/datavalues/src/data_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use core::fmt;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::sync::Arc;

use common_arrow::arrow::datatypes::Schema as ArrowSchema;
Expand All @@ -28,25 +27,25 @@ use crate::DataField;
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
pub struct DataSchema {
pub(crate) fields: Vec<DataField>,
pub(crate) metadata: HashMap<String, String>,
pub(crate) metadata: BTreeMap<String, String>,
}

impl DataSchema {
pub fn empty() -> Self {
Self {
fields: vec![],
metadata: HashMap::new(),
metadata: BTreeMap::new(),
}
}

pub fn new(fields: Vec<DataField>) -> Self {
Self {
fields,
metadata: HashMap::new(),
metadata: BTreeMap::new(),
}
}

pub fn new_from(fields: Vec<DataField>, metadata: HashMap<String, String>) -> Self {
pub fn new_from(fields: Vec<DataField>, metadata: BTreeMap<String, String>) -> Self {
Self { fields, metadata }
}

Expand Down Expand Up @@ -84,7 +83,7 @@ impl DataSchema {

/// Returns an immutable reference to field `metadata`.
#[inline]
pub const fn meta(&self) -> &HashMap<String, String> {
pub const fn meta(&self) -> &BTreeMap<String, String> {
&self.metadata
}

Expand Down Expand Up @@ -148,7 +147,7 @@ impl DataSchema {
.map(|f| f.to_arrow())
.collect::<Vec<_>>();

ArrowSchema::new_from(fields, self.metadata.clone())
ArrowSchema::from(fields).with_metadata(self.metadata.clone())
}
}

Expand All @@ -165,7 +164,7 @@ impl DataSchemaRefExt {
impl From<&ArrowSchema> for DataSchema {
fn from(a_schema: &ArrowSchema) -> Self {
let fields = a_schema
.fields()
.fields
.iter()
.map(|arrow_f| arrow_f.into())
.collect::<Vec<_>>();
Expand Down
40 changes: 19 additions & 21 deletions common/datavalues/src/types/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr {
ArrowType::Date64 => Arc::new(Date32Type::default()),

ArrowType::Struct(fields) => {
let names = fields.iter().map(|f| f.name().to_string()).collect();
let names = fields.iter().map(|f| f.name.clone()).collect();
let types = fields.iter().map(from_arrow_field).collect();

Arc::new(StructType::create(names, types))
Expand All @@ -145,32 +145,30 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr {
}

pub fn from_arrow_field(f: &ArrowField) -> DataTypePtr {
if let Some(m) = f.metadata() {
if let Some(custom_name) = m.get(ARROW_EXTENSION_NAME) {
let metadata = m.get(ARROW_EXTENSION_META).cloned();
match custom_name.as_str() {
"Date" | "Date16" => return Date16Type::arc(),
"Date32" => return Date32Type::arc(),
"DateTime" | "DateTime32" => return DateTime32Type::arc(metadata),
"DateTime64" => match metadata {
Some(meta) => {
let mut chars = meta.chars();
let precision = chars.next().unwrap().to_digit(10).unwrap();
let tz = chars.collect::<String>();
return DateTime64Type::arc(precision as usize, Some(tz));
}
None => return DateTime64Type::arc(3, None),
},
"Interval" => return IntervalType::arc(metadata.unwrap().into()),
_ => {}
}
if let Some(custom_name) = f.metadata.get(ARROW_EXTENSION_NAME) {
let metadata = f.metadata.get(ARROW_EXTENSION_META).cloned();
match custom_name.as_str() {
"Date" | "Date16" => return Date16Type::arc(),
"Date32" => return Date32Type::arc(),
"DateTime" | "DateTime32" => return DateTime32Type::arc(metadata),
"DateTime64" => match metadata {
Some(meta) => {
let mut chars = meta.chars();
let precision = chars.next().unwrap().to_digit(10).unwrap();
let tz = chars.collect::<String>();
return DateTime64Type::arc(precision as usize, Some(tz));
}
None => return DateTime64Type::arc(3, None),
},
"Interval" => return IntervalType::arc(metadata.unwrap().into()),
_ => {}
}
}

let dt = f.data_type();
let ty = from_arrow_type(dt);

let is_nullable = f.is_nullable();
let is_nullable = f.is_nullable;
if is_nullable && ty.can_inside_nullable() {
Arc::new(NullableType::create(ty))
} else {
Expand Down
Loading

0 comments on commit e953145

Please sign in to comment.