Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix record batch memory size double counting #13377

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_execution::memory_pool::{
};
use datafusion_expr::{Expr, TableType};
use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::spill::get_record_batch_memory_size;
use futures::StreamExt;
use std::any::Any;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -265,14 +266,18 @@ async fn sort_spill_reservation() {
// This test case shows how sort_spill_reservation works by
// purposely sorting data that requires non trivial memory to
// sort/merge.

// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be introduced as DataFusion parameter so the user can configure the memory allocation realm. I got some feeling the mem is data dependent, depending on datatypes/data being processed

let test = TestCase::new()
// This query uses a different order than the input table to
// force a sort. It also needs to have multiple columns to
// force RowFormat / interner that makes merge require
// substantial memory
.with_query("select * from t ORDER BY a , b DESC")
// enough memory to sort if we don't try to merge it all at once
.with_memory_limit(partition_size)
.with_memory_limit(mem_limit)
// use a single partition so only a sort is needed
.with_scenario(scenario)
.with_disk_manager_config(DiskManagerConfig::NewOs)
Expand Down Expand Up @@ -311,7 +316,7 @@ async fn sort_spill_reservation() {
// reserve sufficient space up front for merge and this time,
// which will force the spills to happen with less buffered
// input and thus with enough to merge.
.with_sort_spill_reservation_bytes(partition_size / 2);
.with_sort_spill_reservation_bytes(mem_limit / 2);

test.with_config(config).with_expected_success().run().await;
}
Expand Down Expand Up @@ -774,7 +779,7 @@ fn make_dict_batches() -> Vec<RecordBatch> {

// How many bytes does the memory from dict_batches consume?
fn batches_byte_size(batches: &[RecordBatch]) -> usize {
batches.iter().map(|b| b.get_array_memory_size()).sum()
batches.iter().map(get_record_batch_memory_size).sum()
}

#[derive(Debug)]
Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/sorts/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::spill::get_record_batch_memory_size;
use arrow::compute::interleave;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -69,7 +70,8 @@ impl BatchBuilder {

/// Append a new batch in `stream_idx`
pub fn push_batch(&mut self, stream_idx: usize, batch: RecordBatch) -> Result<()> {
self.reservation.try_grow(batch.get_array_memory_size())?;
self.reservation
.try_grow(get_record_batch_memory_size(&batch))?;
let batch_idx = self.batches.len();
self.batches.push((stream_idx, batch));
self.cursors[stream_idx] = BatchCursor {
Expand Down Expand Up @@ -141,7 +143,7 @@ impl BatchBuilder {
stream_cursor.batch_idx = retained;
retained += 1;
} else {
self.reservation.shrink(batch.get_array_memory_size());
self.reservation.shrink(get_record_batch_memory_size(batch));
}
retain
});
Expand Down
24 changes: 15 additions & 9 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::{read_spill_as_stream, spill_record_batches};
use crate::spill::{
get_record_batch_memory_size, read_spill_as_stream, spill_record_batches,
};
use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
Expand Down Expand Up @@ -286,10 +288,12 @@ impl ExternalSorter {
}
self.reserve_memory_for_merge()?;

let size = input.get_array_memory_size();
let size = get_record_batch_memory_size(&input);

if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
self.in_mem_sort().await?;

// Sorting may have freed memory, especially if fetch is `Some`
//
// As such we check again, and if the memory usage has dropped by
Expand Down Expand Up @@ -426,7 +430,7 @@ impl ExternalSorter {
let size: usize = self
.in_mem_batches
.iter()
.map(|x| x.get_array_memory_size())
.map(get_record_batch_memory_size)
.sum();

// Reserve headroom for next sort/merge
Expand Down Expand Up @@ -521,7 +525,8 @@ impl ExternalSorter {
// Concatenate memory batches together and sort
let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation.try_resize(batch.get_array_memory_size())?;
self.reservation
.try_resize(get_record_batch_memory_size(&batch))?;
let reservation = self.reservation.take();
return self.sort_batch_stream(batch, metrics, reservation);
}
Expand All @@ -530,7 +535,8 @@ impl ExternalSorter {
.into_iter()
.map(|batch| {
let metrics = self.metrics.baseline.intermediate();
let reservation = self.reservation.split(batch.get_array_memory_size());
let reservation =
self.reservation.split(get_record_batch_memory_size(&batch));
let input = self.sort_batch_stream(batch, metrics, reservation)?;
Ok(spawn_buffered(input, 1))
})
Expand Down Expand Up @@ -559,7 +565,7 @@ impl ExternalSorter {
metrics: BaselineMetrics,
reservation: MemoryReservation,
) -> Result<SendableRecordBatchStream> {
assert_eq!(batch.get_array_memory_size(), reservation.size());
assert_eq!(get_record_batch_memory_size(&batch), reservation.size());
let schema = batch.schema();

let fetch = self.fetch;
Expand Down Expand Up @@ -1185,9 +1191,9 @@ mod tests {

assert_eq!(metrics.output_rows().unwrap(), 10000);
assert!(metrics.elapsed_compute().unwrap() > 0);
assert_eq!(metrics.spill_count().unwrap(), 4);
assert_eq!(metrics.spilled_bytes().unwrap(), 38784);
assert_eq!(metrics.spilled_rows().unwrap(), 9600);
assert_eq!(metrics.spill_count().unwrap(), 3);
assert_eq!(metrics.spilled_bytes().unwrap(), 36000);
assert_eq!(metrics.spilled_rows().unwrap(), 9000);

let columns = result[0].columns();

Expand Down
208 changes: 205 additions & 3 deletions datafusion/physical-plan/src/spill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
use std::fs::File;
use std::io::BufReader;
use std::path::{Path, PathBuf};
use std::ptr::NonNull;

use arrow::array::ArrayData;
use arrow::datatypes::SchemaRef;
use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use log::debug;
use tokio::sync::mpsc::Sender;

use datafusion_common::{exec_datafusion_err, Result};
use datafusion_common::{exec_datafusion_err, HashSet, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
use datafusion_execution::memory_pool::human_readable_size;
use datafusion_execution::SendableRecordBatchStream;
Expand Down Expand Up @@ -109,10 +111,83 @@ pub fn spill_record_batch_by_size(
Ok(())
}

/// Calculate total used memory of this batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for this comment

///
/// This function is used to estimate the physical memory usage of the `RecordBatch`.
/// It only counts the memory of large data `Buffer`s, and ignores metadata like
/// types and pointers.
/// The implementation will add up all unique `Buffer`'s memory
/// size, due to:
/// - The data pointer inside `Buffer` are memory regions returned by global memory
/// allocator, those regions can't have overlap.
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
///
/// Example:
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
/// to a sub-region of the same buffer.
///
/// {xxxxxxxxxxxxxxxxxxx} <--- buffer
/// ^ ^ ^ ^
/// | | | |
/// col1->{ } | |
/// col2--------->{ }
///
/// In the above case, `get_record_batch_memory_size` will return the size of
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
///
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// <https://github.com/apache/arrow-rs/issues/6439>
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in TopK, RecordBatchStore still uses get_array_memory_size, do you think we should switch to get_record_batch_memory_size there as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they should all be changed, however after changing them in TopK, some existing test cases might be tricky to fix, and more end-to-end tests should be added. So I plan to do it incrementally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- can you possibly file a ticket to track any work that you know about? I can help file it / with the explanation as well

// Store pointers to `Buffer`'s start memory address (instead of actual
// used data region's pointer represented by current `Array`)
let mut counted_buffers: HashSet<NonNull<u8>> = HashSet::new();
let mut total_size = 0;

for array in batch.columns() {
let array_data = array.to_data();
count_array_data_memory_size(&array_data, &mut counted_buffers, &mut total_size);
}

total_size
}

/// Count the memory usage of `array_data` and its children recursively.
fn count_array_data_memory_size(
array_data: &ArrayData,
counted_buffers: &mut HashSet<NonNull<u8>>,
total_size: &mut usize,
) {
// Count memory usage for `array_data`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but you can probably add size of array_data.data_type itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach also missed several other metadata's memory size (like datatype, buffer pointers), they will be included in the more-comprehensive fix in arrow side.
For memory counting in large memory consumer, it's allowed to have certain inaccuracy, as long as major consumption is counted. However I agree this should be better documented.

for buffer in array_data.buffers() {
if counted_buffers.insert(buffer.data_ptr()) {
*total_size += buffer.capacity();
} // Otherwise the buffer's memory is already counted
}

if let Some(null_buffer) = array_data.nulls() {
if counted_buffers.insert(null_buffer.inner().inner().data_ptr()) {
*total_size += null_buffer.inner().inner().capacity();
}
}

// Count all children `ArrayData` recursively
for child in array_data.child_data() {
count_array_data_memory_size(child, counted_buffers, total_size);
}
Comment on lines +176 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use #[recursive] to protect from cases with large nested data types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've learned something new today.
Maybe apache/datafusion-sqlparser-rs#984 can be fixed with this attribute.
But this attribute come with performance overhead 🤔 https://docs.rs/recursive/latest/recursive/ I think stack overflow will happen after 10s of layers of recursion, which is likely for expression but I am not sure arrays can also have such deep nesting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree we don't need to annoate all recursive function calls -- only the ones that will become very large/deep

}

#[cfg(test)]
mod tests {
use super::*;
use crate::spill::{spill_record_batch_by_size, spill_record_batches};
use crate::test::build_table_i32;
use arrow::array::{Float64Array, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::ListArray;
use datafusion_common::Result;
use datafusion_execution::disk_manager::DiskManagerConfig;
use datafusion_execution::DiskManager;
Expand Down Expand Up @@ -147,7 +222,7 @@ mod tests {
assert_eq!(cnt.unwrap(), num_rows);

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
let reader = FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 2);
assert_eq!(reader.schema(), schema);
Expand Down Expand Up @@ -175,11 +250,138 @@ mod tests {
)?;

let file = BufReader::new(File::open(spill_file.path())?);
let reader = arrow::ipc::reader::FileReader::try_new(file, None)?;
let reader = FileReader::try_new(file, None)?;

assert_eq!(reader.num_batches(), 4);
assert_eq!(reader.schema(), schema);

Ok(())
}

#[test]
fn test_get_record_batch_memory_size() {
// Create a simple record batch with two columns
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));

let int_array =
Int32Array::from(vec![Some(1), Some(2), Some(3), Some(4), Some(5)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0, 4.0, 5.0]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 60);
Copy link
Contributor

@blaginin blaginin Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with this PR is that the result of get_record_batch_memory_size differs from get_array_memory_size. For example, here batch.get_array_memory_size() would return 252 instead of 60.

This could be dangerous because the project would end up with two different methods of calculating memory sizes. I can imagine a scenario in the future, where we reserve memory based on one calculation method and shrink it using the result from the other. While the difference may not be large each time, over many repetitions or a large dataset, it could behave almost like a memory leak (but without actual memory), making debugging very challenging...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we completely switch to the new method, blocking the usage of the old one? Should we try to make two numbers match closely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. I also feel that this manual memory accounting is complex and error-prone. We’d better change all of it. (Maybe also use some RAII in the implementation, instead of manually growing and shrinking memory usage as we’re doing right now.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finding a way to automatically update the memory accounting is certainly a good idea in my mind. As we have mentioned, I think the most important thing will be to find a way to account for arrow buffers completely Then we can work it into DataFusion

}

#[test]
fn test_get_record_batch_memory_size_with_null() {
// Create a simple record batch with two columns
let schema = Arc::new(Schema::new(vec![
Field::new("ints", DataType::Int32, true),
Field::new("float64", DataType::Float64, false),
]));

let int_array = Int32Array::from(vec![None, Some(2), Some(3)]);
let float64_array = Float64Array::from(vec![1.0, 2.0, 3.0]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_array), Arc::new(float64_array)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 100);
}

#[test]
fn test_get_record_batch_memory_size_empty() {
// Test with empty record batch
let schema = Arc::new(Schema::new(vec![Field::new(
"ints",
DataType::Int32,
false,
)]));

let int_array: Int32Array = Int32Array::from(vec![] as Vec<i32>);
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array)]).unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 0, "Empty batch should have 0 memory size");
}

#[test]
fn test_get_record_batch_memory_size_shared_buffer() {
// Test with slices that share the same underlying buffer
let original = Int32Array::from(vec![1, 2, 3, 4, 5]);
let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);

// `RecordBatch` with `original` array
// ----
let schema_origin = Arc::new(Schema::new(vec![Field::new(
"origin_col",
DataType::Int32,
false,
)]));
let batch_origin =
RecordBatch::try_new(schema_origin, vec![Arc::new(original)]).unwrap();

// `RecordBatch` with all columns are reference to `original` array
// ----
let schema = Arc::new(Schema::new(vec![
Field::new("slice1", DataType::Int32, false),
Field::new("slice2", DataType::Int32, false),
]));

let batch_sliced =
RecordBatch::try_new(schema, vec![Arc::new(slice1), Arc::new(slice2)])
.unwrap();

// Two sizes should all be only counting the buffer in `original` array
let size_origin = get_record_batch_memory_size(&batch_origin);
let size_sliced = get_record_batch_memory_size(&batch_sliced);

assert_eq!(size_origin, size_sliced);
}

#[test]
fn test_get_record_batch_memory_size_nested_array() {
let schema = Arc::new(Schema::new(vec![
Field::new(
"nested_int",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new(
"nested_int2",
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false,
),
]));

let int_list_array = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), Some(3)]),
]);

let int_list_array2 = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(4), Some(5), Some(6)]),
]);

let batch = RecordBatch::try_new(
schema,
vec![Arc::new(int_list_array), Arc::new(int_list_array2)],
)
.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 8320);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

I think this line isn't covered, because I commented it out and all tests in this file passed. Let's add one more test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Also I believe there are some tools can automatically do similar checks (mutate code and make sure some test case will fail, if don't then there is some issue with test coverage), like https://mutants.rs/
We can investigate how to integrate them into the project 😄