Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Made IPC read dictionaries by id
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Nov 7, 2021
1 parent d09c2e4 commit a5bbfbb
Show file tree
Hide file tree
Showing 18 changed files with 106 additions and 92 deletions.
5 changes: 3 additions & 2 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::sync::Arc;

Expand Down Expand Up @@ -122,7 +123,7 @@ pub fn deserialize_batch(
data: &FlightData,
schema: Arc<Schema>,
is_little_endian: bool,
dictionaries_by_field: &[Option<Arc<dyn Array>>],
dictionaries: &HashMap<usize, Arc<dyn Array>>,
) -> Result<RecordBatch> {
// check that the data_header is a record batch message
let message = ipc::Message::root_as_message(&data.data_header[..])
Expand All @@ -141,7 +142,7 @@ pub fn deserialize_batch(
schema.clone(),
None,
is_little_endian,
dictionaries_by_field,
dictionaries,
ipc::Schema::MetadataVersion::V5,
&mut reader,
0,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes> + TryInto<<u8 as NativeType>::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn read_boolean<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let length = field_node.length() as usize;
let validity = read_validity(
Expand Down
16 changes: 12 additions & 4 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::convert::TryInto;
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::{DictionaryArray, DictionaryKey};
use crate::array::{Array, DictionaryArray, DictionaryKey};
use crate::datatypes::Field;
use crate::error::Result;

use super::super::deserialize::Node;
use super::{read_primitive, skip_primitive};

#[allow(clippy::too_many_arguments)]
pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
field: &Field,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
compression: Option<ipc::Message::BodyCompression>,
is_little_endian: bool,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
{
let values = field_nodes.front().unwrap().1.as_ref().unwrap();
let values = dictionaries
.get(&(field.dict_id().unwrap() as usize))
.unwrap()
.clone();

let keys = read_primitive(
field_nodes,
Expand All @@ -33,7 +41,7 @@ where
compression,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values.clone()))
Ok(DictionaryArray::<T>::from_data(keys, values))
}

pub fn skip_dictionary(
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand Down
13 changes: 8 additions & 5 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::FixedSizeListArray;
use crate::array::{Array, FixedSizeListArray};
use crate::datatypes::DataType;
use crate::error::Result;

Expand All @@ -16,12 +17,13 @@ pub fn read_fixed_size_list<R: Read + Seek>(
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand All @@ -32,13 +34,14 @@ pub fn read_fixed_size_list<R: Read + Seek>(
compression,
)?;

let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type);
let (field, _) = FixedSizeListArray::get_child_and_size(&data_type);

let values = read(
field_nodes,
value_data_type.data_type().clone(),
field,
buffers,
reader,
dictionaries,
block_offset,
is_little_endian,
compression,
Expand Down
13 changes: 8 additions & 5 deletions src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::convert::TryInto;
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::{ListArray, Offset};
use crate::array::{Array, ListArray, Offset};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
Expand All @@ -18,6 +19,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
Expand All @@ -26,7 +28,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand All @@ -48,13 +50,14 @@ where
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(&[O::default()])))?;

let value_data_type = ListArray::<O>::get_child_type(&data_type).clone();
let field = ListArray::<O>::get_child_field(&data_type);

let values = read(
field_nodes,
value_data_type,
field,
buffers,
reader,
dictionaries,
block_offset,
is_little_endian,
compression,
Expand Down
13 changes: 8 additions & 5 deletions src/io/ipc/read/array/map.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::MapArray;
use crate::array::{Array, MapArray};
use crate::buffer::Buffer;
use crate::datatypes::DataType;
use crate::error::Result;
Expand All @@ -17,12 +18,13 @@ pub fn read_map<R: Read + Seek>(
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<MapArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand All @@ -44,13 +46,14 @@ pub fn read_map<R: Read + Seek>(
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<i32>::from(&[0i32])))?;

let value_data_type = MapArray::get_field(&data_type).data_type().clone();
let field = MapArray::get_field(&data_type);

let field = read(
field_nodes,
value_data_type,
field,
buffers,
reader,
dictionaries,
block_offset,
is_little_endian,
compression,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::super::deserialize::Node;
pub fn read_null(field_nodes: &mut VecDeque<Node>, data_type: DataType) -> NullArray {
NullArray::from_data(
data_type,
field_nodes.pop_front().unwrap().0.length() as usize,
field_nodes.pop_front().unwrap().length() as usize,
)
}

Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub fn read_primitive<T: NativeType, R: Read + Seek>(
where
Vec<u8>: TryInto<T::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand Down
11 changes: 7 additions & 4 deletions src/io/ipc/read/array/struct_.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::StructArray;
use crate::array::{Array, StructArray};
use crate::datatypes::DataType;
use crate::error::Result;

Expand All @@ -16,12 +17,13 @@ pub fn read_struct<R: Read + Seek>(
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<StructArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand All @@ -39,9 +41,10 @@ pub fn read_struct<R: Read + Seek>(
.map(|field| {
read(
field_nodes,
field.data_type().clone(),
field,
buffers,
reader,
dictionaries,
block_offset,
is_little_endian,
compression,
Expand Down
11 changes: 7 additions & 4 deletions src/io/ipc/read/array/union.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::collections::VecDeque;
use std::collections::{HashMap, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

use arrow_format::ipc;

use crate::array::UnionArray;
use crate::array::{Array, UnionArray};
use crate::datatypes::DataType;
use crate::datatypes::UnionMode::Dense;
use crate::error::Result;
Expand All @@ -17,12 +18,13 @@ pub fn read_union<R: Read + Seek>(
data_type: DataType,
buffers: &mut VecDeque<&ipc::Schema::Buffer>,
reader: &mut R,
dictionaries: &HashMap<usize, Arc<dyn Array>>,
block_offset: u64,
is_little_endian: bool,
compression: Option<ipc::Message::BodyCompression>,
version: ipc::Schema::MetadataVersion,
) -> Result<UnionArray> {
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

if version != ipc::Schema::MetadataVersion::V5 {
let _ = buffers.pop_front().unwrap();
Expand Down Expand Up @@ -61,9 +63,10 @@ pub fn read_union<R: Read + Seek>(
.map(|field| {
read(
field_nodes,
field.data_type().clone(),
field,
buffers,
reader,
dictionaries,
block_offset,
is_little_endian,
compression,
Expand Down
2 changes: 1 addition & 1 deletion src/io/ipc/read/array/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub fn read_utf8<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes> + TryInto<<u8 as NativeType>::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;
let field_node = field_nodes.pop_front().unwrap();

let validity = read_validity(
buffers,
Expand Down
Loading

0 comments on commit a5bbfbb

Please sign in to comment.