Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

feat: better error message when reader feather v1 #1528

Merged
merged 3 commits into from
Aug 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/ffi/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ unsafe fn get_buffer_ptr<T: NativeType>(
let ptr = *buffers.add(index);
if ptr.is_null() {
return Err(Error::oos(format!(
"An array of type {data_type:?}
"An array of type {data_type:?}
must have a non-null buffer {index}"
)));
}
Expand All @@ -235,9 +235,14 @@ unsafe fn create_buffer<T: NativeType>(
owner: InternalArrowArray,
index: usize,
) -> Result<Buffer<T>> {
let len = buffer_len(array, data_type, index)?;

if len == 0 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c++ arrow can create null pointers when arrays are empty. This ensures we don't have to deal with that.

return Ok(Buffer::new());
}

let ptr = get_buffer_ptr(array, data_type, index)?;

let len = buffer_len(array, data_type, index)?;
let offset = buffer_offset(array, data_type, index);
let bytes = Bytes::from_foreign(ptr, len, BytesAllocator::InternalArrowArray(owner));

Expand All @@ -258,9 +263,12 @@ unsafe fn create_bitmap(
// we can use the null count directly
is_validity: bool,
) -> Result<Bitmap> {
let len: usize = array.length.try_into().expect("length to fit in `usize`");
if len == 0 {
return Ok(Bitmap::new());
}
let ptr = get_buffer_ptr(array, data_type, index)?;

let len: usize = array.length.try_into().expect("length to fit in `usize`");
let offset: usize = array.offset.try_into().expect("offset to fit in `usize`");
let bytes_len = bytes_for(offset + len);
let bytes = Bytes::from_foreign(ptr, bytes_len, BytesAllocator::InternalArrowArray(owner));
Expand Down
3 changes: 2 additions & 1 deletion src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ pub mod append;
pub mod read;
pub mod write;

const ARROW_MAGIC: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
const ARROW_MAGIC_V1: [u8; 4] = [b'F', b'E', b'A', b'1'];
const ARROW_MAGIC_V2: [u8; 6] = [b'A', b'R', b'R', b'O', b'W', b'1'];
pub(crate) const CONTINUATION_MARKER: [u8; 4] = [0xff; 4];

/// Struct containing `dictionary_id` and nested `IpcField`, allowing users
Expand Down
9 changes: 6 additions & 3 deletions src/io/ipc/read/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::datatypes::Schema;
use crate::error::{Error, Result};
use crate::io::ipc::IpcSchema;

use super::super::{ARROW_MAGIC, CONTINUATION_MARKER};
use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER};
use super::common::*;
use super::schema::fb_to_schema;
use super::Dictionaries;
Expand Down Expand Up @@ -151,7 +151,7 @@ fn read_footer_len<R: Read + Seek>(reader: &mut R) -> Result<(u64, usize)> {
reader.read_exact(&mut footer)?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V2 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
let footer_len = footer_len
Expand Down Expand Up @@ -215,7 +215,10 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
let mut magic_buffer: [u8; 6] = [0; 6];
let start = reader.stream_position()?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
if magic_buffer != ARROW_MAGIC_V2 {
if &magic_buffer[..4] == ARROW_MAGIC_V1 {
return Err(Error::NotYetImplemented("feather v1 not supported".into()));
}
return Err(Error::from(OutOfSpecKind::InvalidHeader));
}

Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/read/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::array::*;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{Error, Result};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC, CONTINUATION_MARKER};
use crate::io::ipc::{IpcSchema, ARROW_MAGIC_V2, CONTINUATION_MARKER};

use super::common::{apply_projection, prepare_projection, read_dictionary, read_record_batch};
use super::file::{deserialize_footer, get_record_batch};
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn read_footer_len<R: AsyncRead + AsyncSeek + Unpin>(reader: &mut R) -> Re
reader.read_exact(&mut footer).await?;
let footer_len = i32::from_le_bytes(footer[..4].try_into().unwrap());

if footer[4..] != ARROW_MAGIC {
if footer[4..] != ARROW_MAGIC_V2 {
return Err(Error::from(OutOfSpecKind::InvalidFooter));
}
footer_len
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/file_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::schema::serialize_schema;
use super::{default_ipc_fields, schema_to_bytes, Record};
use crate::datatypes::*;
use crate::error::{Error, Result};
use crate::io::ipc::{IpcField, ARROW_MAGIC};
use crate::io::ipc::{IpcField, ARROW_MAGIC_V2};

type WriteOutput<W> = (usize, Option<Block>, Vec<Block>, Option<W>);

Expand Down Expand Up @@ -105,7 +105,7 @@ where
}

async fn start(mut writer: W, encoded: EncodedData) -> Result<WriteOutput<W>> {
writer.write_all(&ARROW_MAGIC[..]).await?;
writer.write_all(&ARROW_MAGIC_V2[..]).await?;
writer.write_all(&[0, 0]).await?;
let (meta, data) = write_message(&mut writer, encoded).await?;

Expand Down Expand Up @@ -149,7 +149,7 @@ where
writer
.write_all(&(footer.len() as i32).to_le_bytes())
.await?;
writer.write_all(&ARROW_MAGIC).await?;
writer.write_all(&ARROW_MAGIC_V2).await?;
writer.close().await?;

Ok((0, None, vec![], None))
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use arrow_format::ipc::planus::Builder;

use super::{
super::IpcField,
super::ARROW_MAGIC,
super::ARROW_MAGIC_V2,
common::{DictionaryTracker, EncodedData, WriteOptions},
common_sync::{write_continuation, write_message},
default_ipc_fields, schema, schema_to_bytes,
Expand Down Expand Up @@ -114,7 +114,7 @@ impl<W: Write> FileWriter<W> {
return Err(Error::oos("The IPC file can only be started once"));
}
// write magic to header
self.writer.write_all(&ARROW_MAGIC[..])?;
self.writer.write_all(&ARROW_MAGIC_V2[..])?;
// create an 8-byte boundary after the header
self.writer.write_all(&[0, 0])?;
// write the schema, set the written bytes to the schema
Expand Down Expand Up @@ -205,7 +205,7 @@ impl<W: Write> FileWriter<W> {
self.writer.write_all(footer_data)?;
self.writer
.write_all(&(footer_data.len() as i32).to_le_bytes())?;
self.writer.write_all(&ARROW_MAGIC)?;
self.writer.write_all(&ARROW_MAGIC_V2)?;
self.writer.flush()?;
self.state = State::Finished;

Expand Down