Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
don't allocate padding bytes
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 16, 2023
1 parent 211be21 commit 3fc6241
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 9 deletions.
8 changes: 5 additions & 3 deletions src/io/ipc/write/common_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use super::common::pad_to_64;
use super::common::EncodedData;

/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
pub fn write_message<W: Write>(writer: &mut W, encoded: &EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();

let a = 8 - 1;
let buffer = encoded.ipc_message;
let buffer = &encoded.ipc_message;
let flatbuf_size = buffer.len();
let prefix_size = 8;
let aligned_size = (flatbuf_size + prefix_size + a) & !a;
Expand All @@ -24,7 +24,9 @@ pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(
writer.write_all(&buffer)?;
}
// write padding
writer.write_all(&vec![0; padding_bytes])?;
// aligned to a 8 byte boundary, so maximum is [u8;8]
const PADDING_MAX: [u8; 8] = [0u8; 8];
writer.write_all(&PADDING_MAX[..padding_bytes])?;

// write arrow data
let body_len = if arrow_data_len > 0 {
Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<W: Write> StreamWriter<W> {
ipc_message: schema_to_bytes(schema, self.ipc_fields.as_ref().unwrap()),
arrow_data: vec![],
};
write_message(&mut self.writer, encoded_message)?;
write_message(&mut self.writer, &encoded_message)?;
Ok(())
}

Expand Down Expand Up @@ -91,10 +91,10 @@ impl<W: Write> StreamWriter<W> {
)?;

for encoded_dictionary in encoded_dictionaries {
write_message(&mut self.writer, encoded_dictionary)?;
write_message(&mut self.writer, &encoded_dictionary)?;
}

write_message(&mut self.writer, encoded_message)?;
write_message(&mut self.writer, &encoded_message)?;
Ok(())
}

Expand Down
6 changes: 3 additions & 3 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<W: Write> FileWriter<W> {
arrow_data: vec![],
};

let (meta, data) = write_message(&mut self.writer, encoded_message)?;
let (meta, data) = write_message(&mut self.writer, &encoded_message)?;
self.block_offsets += meta + data + 8; // 8 <=> arrow magic + 2 bytes for alignment
self.state = State::Started;
Ok(())
Expand Down Expand Up @@ -142,7 +142,7 @@ impl<W: Write> FileWriter<W> {

// add all dictionaries
for encoded_dictionary in encoded_dictionaries {
let (meta, data) = write_message(&mut self.writer, encoded_dictionary)?;
let (meta, data) = write_message(&mut self.writer, &encoded_dictionary)?;

let block = arrow_format::ipc::Block {
offset: self.block_offsets as i64,
Expand All @@ -153,7 +153,7 @@ impl<W: Write> FileWriter<W> {
self.block_offsets += meta + data;
}

let (meta, data) = write_message(&mut self.writer, encoded_message)?;
let (meta, data) = write_message(&mut self.writer, &encoded_message)?;
// add a record block for the footer
let block = arrow_format::ipc::Block {
offset: self.block_offsets as i64,
Expand Down

0 comments on commit 3fc6241

Please sign in to comment.