From b882164d3fe3e62b5a18089b1815848abfda13e2 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Tue, 15 Feb 2022 18:24:36 +0100 Subject: [PATCH] Fixed error in writing compressed files (#840) --- src/io/ipc/compression.rs | 4 +--- src/io/ipc/write/common_async.rs | 1 - src/io/ipc/write/common_sync.rs | 1 - src/io/ipc/write/serialize.rs | 9 +++------ 4 files changed, 4 insertions(+), 11 deletions(-) diff --git a/src/io/ipc/compression.rs b/src/io/ipc/compression.rs index 03b06868ae0..482786c5a4c 100644 --- a/src/io/ipc/compression.rs +++ b/src/io/ipc/compression.rs @@ -44,9 +44,7 @@ pub fn compress_lz4(input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { #[cfg(feature = "io_ipc_compression")] #[cfg_attr(docsrs, doc(cfg(feature = "io_ipc_compression")))] pub fn compress_zstd(input_buf: &[u8], output_buf: &mut Vec) -> Result<()> { - use std::io::Write; - let mut encoder = zstd::Encoder::new(output_buf, 0)?.auto_finish(); - encoder.write_all(input_buf).map_err(|e| e.into()) + zstd::stream::copy_encode(input_buf, output_buf, 0).map_err(|e| e.into()) } #[cfg(not(feature = "io_ipc_compression"))] diff --git a/src/io/ipc/write/common_async.rs b/src/io/ipc/write/common_async.rs index d9e28a0e386..cade8331dc1 100644 --- a/src/io/ipc/write/common_async.rs +++ b/src/io/ipc/write/common_async.rs @@ -13,7 +13,6 @@ pub async fn write_message( encoded: EncodedData, ) -> Result<(usize, usize)> { let arrow_data_len = encoded.arrow_data.len(); - assert_eq!(arrow_data_len % 8, 0, "Arrow data not aligned"); let a = 8 - 1; let buffer = encoded.ipc_message; diff --git a/src/io/ipc/write/common_sync.rs b/src/io/ipc/write/common_sync.rs index 60e47a65142..bafde8664c4 100644 --- a/src/io/ipc/write/common_sync.rs +++ b/src/io/ipc/write/common_sync.rs @@ -9,7 +9,6 @@ use super::common::EncodedData; /// Write a message's IPC data and buffers, returning metadata and buffer data lengths written pub fn write_message(writer: &mut W, encoded: EncodedData) -> Result<(usize, usize)> { let arrow_data_len = encoded.arrow_data.len(); - assert_eq!(arrow_data_len % 8, 0, "Arrow data not aligned"); let a = 8 - 1; let buffer = encoded.ipc_message; diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index bb7b245e974..0ff8c5939e4 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -657,10 +657,9 @@ fn write_bytes( } } else { arrow_data.extend_from_slice(bytes); + pad_buffer_to_8(arrow_data, arrow_data.len() - start); }; - pad_buffer_to_8(arrow_data, arrow_data.len() - start); - let total_len = (arrow_data.len() - start) as i64; buffers.push(ipc::Buffer { offset: *offset, @@ -713,10 +712,9 @@ fn write_buffer( _write_compressed_buffer(buffer, arrow_data, is_little_endian, compression); } else { _write_buffer(buffer, arrow_data, is_little_endian); + pad_buffer_to_8(arrow_data, arrow_data.len() - start); }; - pad_buffer_to_8(arrow_data, arrow_data.len() - start); - let total_len = (arrow_data.len() - start) as i64; buffers.push(ipc::Buffer { offset: *offset, @@ -821,10 +819,9 @@ fn write_buffer_from_iter>( _write_compressed_buffer_from_iter(buffer, arrow_data, is_little_endian, compression); } else { _write_buffer_from_iter(buffer, arrow_data, is_little_endian); + pad_buffer_to_8(arrow_data, arrow_data.len() - start); } - pad_buffer_to_8(arrow_data, arrow_data.len() - start); - let total_len = (arrow_data.len() - start) as i64; buffers.push(ipc::Buffer { offset: *offset,