Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Moved dict_id to IPC-specific IO
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Dec 26, 2021
1 parent f33a41f commit f2a22b6
Show file tree
Hide file tree
Showing 43 changed files with 1,954 additions and 1,751 deletions.
4 changes: 2 additions & 2 deletions examples/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ fn write_ipc<W: Write + Seek>(writer: W, array: impl Array + 'static) -> Result<
let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]);

let options = write::WriteOptions { compression: None };
let mut writer = write::FileWriter::try_new(writer, &schema, options)?;
let mut writer = write::FileWriter::try_new(writer, &schema, None, options)?;

let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?;

writer.write(&batch)?;
writer.write(&batch, None)?;

Ok(writer.into_inner())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/json_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn read_path(path: &str, projection: Option<Vec<&str>>) -> Result<RecordBatch> {

// deserialize `rows` into a `RecordBatch`. This is CPU-intensive, has no IO,
// and can be performed on a different thread pool via a channel.
read::deserialize(&rows, fields)
read::deserialize(rows, fields)
}

fn main() -> Result<()> {
Expand Down
83 changes: 83 additions & 0 deletions src/columns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Contains [`Columns`], a container [`Array`] where all arrays have the
//! same length.
use std::sync::Arc;

use crate::array::Array;
use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

/// A vector of [`Array`] where every array has the same length.
#[derive(Debug, Clone, PartialEq)]
pub struct Columns<A: AsRef<dyn Array>> {
arrays: Vec<A>,
}

impl<A: AsRef<dyn Array>> Columns<A> {
/// Creates a new [`Columns`].
/// # Panic
/// Iff the arrays do not have the same length
pub fn new(arrays: Vec<A>) -> Self {
Self::try_new(arrays).unwrap()
}

/// Creates a new [`Columns`].
/// # Error
/// Iff the arrays do not have the same length
pub fn try_new(arrays: Vec<A>) -> Result<Self> {
if !arrays.is_empty() {
let len = arrays.first().unwrap().as_ref().len();
if arrays
.iter()
.map(|array| array.as_ref())
.any(|array| array.len() != len)
{
return Err(ArrowError::InvalidArgumentError(
"Columns require all its arrays to have an equal number of rows".to_string(),
));
}
}
Ok(Self { arrays })
}

/// returns the [`Array`]s in [`Columns`].
pub fn arrays(&self) -> &[A] {
&self.arrays
}

/// returns the length (number of rows)
pub fn len(&self) -> usize {
self.arrays
.first()
.map(|x| x.as_ref().len())
.unwrap_or_default()
}

/// Consumes [`Columns`] into its underlying arrays.
/// The arrays are guaranteed to have the same length
pub fn into_arrays(self) -> Vec<A> {
self.arrays
}
}

impl<A: AsRef<dyn Array>> From<Columns<A>> for Vec<A> {
fn from(c: Columns<A>) -> Self {
c.into_arrays()
}
}

impl<A: AsRef<dyn Array>> std::ops::Deref for Columns<A> {
type Target = [A];

#[inline]
fn deref(&self) -> &[A] {
self.arrays()
}
}

impl From<RecordBatch> for Columns<Arc<dyn Array>> {
fn from(batch: RecordBatch) -> Self {
Self {
arrays: batch.into_inner().0,
}
}
}
34 changes: 0 additions & 34 deletions src/datatypes/field.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ pub struct Field {
pub data_type: DataType,
/// Whether its values can be null or not
pub nullable: bool,
/// The dictionary id of this field (currently un-used)
pub dict_id: i64,
/// A map of key-value pairs containing additional custom meta data.
pub metadata: Option<BTreeMap<String, String>>,
}
Expand Down Expand Up @@ -62,23 +60,6 @@ impl Field {
name: name.into(),
data_type,
nullable,
dict_id: 0,
metadata: None,
}
}

/// Creates a new field
pub fn new_dict<T: Into<String>>(
name: T,
data_type: DataType,
nullable: bool,
dict_id: i64,
) -> Self {
Field {
name: name.into(),
data_type,
nullable,
dict_id,
metadata: None,
}
}
Expand All @@ -90,7 +71,6 @@ impl Field {
name: self.name,
data_type: self.data_type,
nullable: self.nullable,
dict_id: self.dict_id,
metadata: Some(metadata),
}
}
Expand Down Expand Up @@ -131,15 +111,6 @@ impl Field {
self.nullable
}

/// Returns the dictionary ID, if this is a dictionary type.
#[inline]
pub const fn dict_id(&self) -> Option<i64> {
match self.data_type {
DataType::Dictionary(_, _, _) => Some(self.dict_id),
_ => None,
}
}

/// Merge field into self if it is compatible. Struct will be merged recursively.
/// NOTE: `self` may be updated to unexpected state in case of merge failure.
///
Expand Down Expand Up @@ -175,11 +146,6 @@ impl Field {
}
_ => {}
}
if from.dict_id != self.dict_id {
return Err(ArrowError::InvalidArgumentError(
"Fail to merge schema Field due to conflicting dict_id".to_string(),
));
}
match &mut self.data_type {
DataType::Struct(nested_fields) => match &from.data_type {
DataType::Struct(from_nested_fields) => {
Expand Down
45 changes: 23 additions & 22 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

use arrow_format::flight::data::{FlightData, SchemaResult};
use arrow_format::ipc;

use crate::{
array::*,
datatypes::*,
error::{ArrowError, Result},
io::ipc::fb_to_schema,
io::ipc::read::read_record_batch,
io::ipc::read,
io::ipc::write,
io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, WriteOptions},
io::ipc::write::common::{encode_columns, DictionaryTracker, EncodedData, WriteOptions},
record_batch::RecordBatch,
};

use super::ipc::{IpcField, IpcSchema};

/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries
/// and a [`FlightData`] representing the batch.
pub fn serialize_batch(
batch: &RecordBatch,
fields: &[IpcField],
options: &WriteOptions,
) -> (Vec<FlightData>, FlightData) {
let mut dictionary_tracker = DictionaryTracker::new(false);

let columns = batch.clone().into();
let (encoded_dictionaries, encoded_batch) =
encoded_batch(batch, &mut dictionary_tracker, options)
encode_columns(&columns, fields, &mut dictionary_tracker, options)
.expect("DictionaryTracker configured above to not error on replacement");

let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect();
Expand All @@ -45,46 +46,46 @@ impl From<EncodedData> for FlightData {
}

/// Serializes a [`Schema`] to [`SchemaResult`].
pub fn serialize_schema_to_result(schema: &Schema) -> SchemaResult {
pub fn serialize_schema_to_result(schema: &Schema, ipc_fields: &[IpcField]) -> SchemaResult {
SchemaResult {
schema: schema_as_flatbuffer(schema),
schema: schema_as_flatbuffer(schema, ipc_fields),
}
}

/// Serializes a [`Schema`] to [`FlightData`].
pub fn serialize_schema(schema: &Schema) -> FlightData {
let data_header = schema_as_flatbuffer(schema);
pub fn serialize_schema(schema: &Schema, ipc_fields: &[IpcField]) -> FlightData {
let data_header = schema_as_flatbuffer(schema, ipc_fields);
FlightData {
data_header,
..Default::default()
}
}

/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`].
pub fn serialize_schema_to_info(schema: &Schema) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema);
/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::data::FlightInfo`].
pub fn serialize_schema_to_info(schema: &Schema, ipc_fields: &[IpcField]) -> Result<Vec<u8>> {
let encoded_data = schema_as_encoded_data(schema, ipc_fields);

let mut schema = vec![];
write::common_sync::write_message(&mut schema, encoded_data)?;
Ok(schema)
}

fn schema_as_flatbuffer(schema: &Schema) -> Vec<u8> {
let encoded_data = schema_as_encoded_data(schema);
fn schema_as_flatbuffer(schema: &Schema, ipc_fields: &[IpcField]) -> Vec<u8> {
let encoded_data = schema_as_encoded_data(schema, ipc_fields);
encoded_data.ipc_message
}

fn schema_as_encoded_data(schema: &Schema) -> EncodedData {
fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedData {
EncodedData {
ipc_message: write::schema_to_bytes(schema),
ipc_message: write::schema_to_bytes(schema, ipc_fields),
arrow_data: vec![],
}
}

/// Deserialize an IPC message into a schema
fn schema_from_bytes(bytes: &[u8]) -> Result<Schema> {
if let Ok(ipc) = ipc::Message::root_as_message(bytes) {
if let Some((schema, _)) = ipc.header_as_schema().map(fb_to_schema) {
if let Some((schema, _)) = ipc.header_as_schema().map(read::fb_to_schema) {
Ok(schema)
} else {
Err(ArrowError::OutOfSpec(
Expand Down Expand Up @@ -126,8 +127,8 @@ impl TryFrom<&SchemaResult> for Schema {
pub fn deserialize_batch(
data: &FlightData,
schema: Arc<Schema>,
is_little_endian: bool,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
ipc_schema: &IpcSchema,
dictionaries: &read::Dictionaries,
) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| {
Expand All @@ -144,11 +145,11 @@ pub fn deserialize_batch(
)
})
.map(|batch| {
read_record_batch(
read::read_record_batch(
batch,
schema.clone(),
ipc_schema,
None,
is_little_endian,
dictionaries,
ipc::Schema::MetadataVersion::V5,
&mut reader,
Expand Down
Loading

0 comments on commit f2a22b6

Please sign in to comment.