Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed error in writing compressed files (#840)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Feb 15, 2022
1 parent 618a650 commit b882164
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 11 deletions.
4 changes: 1 addition & 3 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
#[cfg(feature = "io_ipc_compression")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))]
pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec<u8>) -> Result<()> {
use std::io::Write;
let mut encoder = zstd::Encoder::new(output_buf, 0)?.auto_finish();
encoder.write_all(input_buf).map_err(|e| e.into())
zstd::stream::copy_encode(input_buf, output_buf, 0).map_err(|e| e.into())
}

#[cfg(not(feature = "io_ipc_compression"))]
Expand Down
1 change: 0 additions & 1 deletion src/io/ipc/write/common_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ pub async fn write_message<W: AsyncWrite + Unpin + Send>(
encoded: EncodedData,
) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
assert_eq!(arrow_data_len % 8, 0, "Arrow data not aligned");

let a = 8 - 1;
let buffer = encoded.ipc_message;
Expand Down
1 change: 0 additions & 1 deletion src/io/ipc/write/common_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use super::common::EncodedData;
/// Write a message's IPC data and buffers, returning metadata and buffer data lengths written
pub fn write_message<W: Write>(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> {
let arrow_data_len = encoded.arrow_data.len();
assert_eq!(arrow_data_len % 8, 0, "Arrow data not aligned");

let a = 8 - 1;
let buffer = encoded.ipc_message;
Expand Down
9 changes: 3 additions & 6 deletions src/io/ipc/write/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,9 @@ fn write_bytes(
}
} else {
arrow_data.extend_from_slice(bytes);
pad_buffer_to_8(arrow_data, arrow_data.len() - start);
};

pad_buffer_to_8(arrow_data, arrow_data.len() - start);

let total_len = (arrow_data.len() - start) as i64;
buffers.push(ipc::Buffer {
offset: *offset,
Expand Down Expand Up @@ -713,10 +712,9 @@ fn write_buffer<T: NativeType>(
_write_compressed_buffer(buffer, arrow_data, is_little_endian, compression);
} else {
_write_buffer(buffer, arrow_data, is_little_endian);
pad_buffer_to_8(arrow_data, arrow_data.len() - start);
};

pad_buffer_to_8(arrow_data, arrow_data.len() - start);

let total_len = (arrow_data.len() - start) as i64;
buffers.push(ipc::Buffer {
offset: *offset,
Expand Down Expand Up @@ -821,10 +819,9 @@ fn write_buffer_from_iter<T: NativeType, I: TrustedLen<Item = T>>(
_write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression);
} else {
_write_buffer_from_iter(buffer, arrow_data, is_little_endian);
pad_buffer_to_8(arrow_data, arrow_data.len() - start);
}

pad_buffer_to_8(arrow_data, arrow_data.len() - start);

let total_len = (arrow_data.len() - start) as i64;
buffers.push(ipc::Buffer {
offset: *offset,
Expand Down

0 comments on commit b882164

Please sign in to comment.