From 11e5369203085b02b523dd5bbba38da82102b9f8 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Mon, 16 Jan 2023 13:43:49 +0100 Subject: [PATCH] amortize encoded_message --- src/io/ipc/append/mod.rs | 1 + src/io/ipc/write/common.rs | 42 +++++++++++++++++++++++++++++--------- src/io/ipc/write/writer.rs | 14 +++++++++---- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/io/ipc/append/mod.rs b/src/io/ipc/append/mod.rs index 9e084a45896..6e637cd0e81 100644 --- a/src/io/ipc/append/mod.rs +++ b/src/io/ipc/append/mod.rs @@ -67,6 +67,7 @@ impl FileWriter { dictionaries, cannot_replace: true, }, + encoded_message: Default::default(), }) } } diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index e2bff77a7d5..155a0079c67 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -177,6 +177,25 @@ pub fn encode_chunk( dictionary_tracker: &mut DictionaryTracker, options: &WriteOptions, ) -> Result<(Vec, EncodedData)> { + let mut encoded_message = EncodedData::default(); + let encoded_dictionaries = encode_chunk_amortized( + chunk, + fields, + dictionary_tracker, + options, + &mut encoded_message, + )?; + Ok((encoded_dictionaries, encoded_message)) +} + +// Amortizes `EncodedData` allocation. +pub fn encode_chunk_amortized( + chunk: &Chunk>, + fields: &[IpcField], + dictionary_tracker: &mut DictionaryTracker, + options: &WriteOptions, + encoded_message: &mut EncodedData, +) -> Result> { let mut encoded_dictionaries = vec![]; for (field, array) in fields.iter().zip(chunk.as_ref()) { @@ -189,9 +208,9 @@ pub fn encode_chunk( )?; } - let encoded_message = chunk_to_bytes(chunk, options); + chunk_to_bytes_amortized(chunk, options, encoded_message); - Ok((encoded_dictionaries, encoded_message)) + Ok(encoded_dictionaries) } fn serialize_compression( @@ -213,10 +232,16 @@ fn serialize_compression( /// Write [`Chunk`] into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the batch's data -fn chunk_to_bytes(chunk: &Chunk>, options: &WriteOptions) -> EncodedData { +fn chunk_to_bytes_amortized( + chunk: &Chunk>, + options: &WriteOptions, + encoded_message: &mut EncodedData, +) { let mut nodes: Vec = vec![]; let mut buffers: Vec = vec![]; - let mut arrow_data: Vec = vec![]; + let mut arrow_data = std::mem::take(&mut encoded_message.arrow_data); + arrow_data.clear(); + let mut offset = 0; for array in chunk.arrays() { write( @@ -248,11 +273,8 @@ fn chunk_to_bytes(chunk: &Chunk>, options: &WriteOptions) -> Enco let mut builder = Builder::new(); let ipc_message = builder.finish(&message, None); - - EncodedData { - ipc_message: ipc_message.to_vec(), - arrow_data, - } + encoded_message.ipc_message = ipc_message.to_vec(); + encoded_message.arrow_data = arrow_data } /// Write dictionary values into two sets of bytes, one for the header (ipc::Schema::Message) and the @@ -360,7 +382,7 @@ impl DictionaryTracker { } /// Stores the encoded data, which is an ipc::Schema::Message, and optional Arrow data -#[derive(Debug)] +#[derive(Debug, Default)] pub struct EncodedData { /// An encoded ipc::Schema::Message pub ipc_message: Vec, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 882faa5cbb9..abc9f0029c2 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -5,7 +5,7 @@ use arrow_format::ipc::planus::Builder; use super::{ super::IpcField, super::ARROW_MAGIC, - common::{encode_chunk, DictionaryTracker, EncodedData, WriteOptions}, + common::{DictionaryTracker, EncodedData, WriteOptions}, common_sync::{write_continuation, write_message}, default_ipc_fields, schema, schema_to_bytes, }; @@ -14,6 +14,7 @@ use crate::array::Array; use crate::chunk::Chunk; use crate::datatypes::*; use crate::error::{Error, Result}; +use crate::io::ipc::write::common::encode_chunk_amortized; #[derive(Clone, Copy, PartialEq, Eq)] pub(crate) enum State { @@ -41,6 +42,10 @@ pub struct FileWriter { pub(crate) state: State, /// Keeps track of dictionaries that have been written pub(crate) dictionary_tracker: DictionaryTracker, + /// Buffer/scratch that is reused between writes + /// This is public so a user can swap this in between + /// creating new writers to save/amortize allocations. + pub encoded_message: EncodedData, } impl FileWriter { @@ -83,6 +88,7 @@ impl FileWriter { dictionaries: Default::default(), cannot_replace: true, }, + encoded_message: Default::default(), } } @@ -132,12 +138,12 @@ impl FileWriter { } else { self.ipc_fields.as_ref() }; - - let (encoded_dictionaries, encoded_message) = encode_chunk( + let encoded_dictionaries = encode_chunk_amortized( chunk, ipc_fields, &mut self.dictionary_tracker, &self.options, + &mut self.encoded_message, )?; // add all dictionaries @@ -153,7 +159,7 @@ impl FileWriter { self.block_offsets += meta + data; } - let (meta, data) = write_message(&mut self.writer, &encoded_message)?; + let (meta, data) = write_message(&mut self.writer, &self.encoded_message)?; // add a record block for the footer let block = arrow_format::ipc::Block { offset: self.block_offsets as i64,