Skip to content

Commit

Permalink
fix: dramatically reduce checkpoint memory consumption (#2956)
Browse files Browse the repository at this point in the history
Both commits describe the specific fixes, but basically our checkpoint
code was collecting too much into memory when it could iterate!
:roller_coaster:

With a test table:

Before

`Maximum resident set size (kbytes): 19964728`

After

`Maximum resident set size (kbytes): 4017132`


Sponsored-by: [Scribd Inc](https://tech.scribd.com)

---------

Signed-off-by: R. Tyler Croy <[email protected]>
  • Loading branch information
rtyler authored Oct 23, 2024
1 parent 10c6b5c commit c05931a
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn parquet_bytes_from_state(
remove.extended_file_metadata = Some(false);
}
}
let files = state.file_actions().unwrap();
let files = state.file_actions_iter().unwrap();
// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.protocol().min_reader_version,
Expand Down Expand Up @@ -323,8 +323,8 @@ fn parquet_bytes_from_state(
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
.chain(files.map(|f| {
checkpoint_add_from_state(&f, partition_col_data_types.as_slice(), &stats_conversions)
}));

// Create the arrow schema that represents the Checkpoint parquet file.
Expand All @@ -349,17 +349,23 @@ fn parquet_bytes_from_state(
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

// Count of actions
let mut total_actions = 0;

for j in jsons {
let buf = serde_json::to_string(&j?).unwrap();
let _ = decoder.decode(buf.as_bytes())?;
total_actions += 1;
}
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64)
let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
.with_size_in_bytes(bytes.len() as i64)
.build();
Ok((checkpoint, bytes::Bytes::from(bytes)))
Expand Down

0 comments on commit c05931a

Please sign in to comment.