Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix record batch memory size double counting #13377

Merged
merged 5 commits into from
Nov 15, 2024

Conversation

2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Nov 12, 2024

Which issue does this PR close?

First step to fix #13089

Rationale for this change

Now record_batch.get_array_memory_size() will overestimate memory usage: If multiple array are pointing to the same underlying buffer, they will be counted repeatedly.
A more detailed explanation can be found in this PR's comment:

/// Calculate total used memory of this batch.
///
/// This function is used to estimate the physical memory usage of the `RecordBatch`. The implementation will add up all unique `Buffer`'s memory
/// size, due to:
/// - The data pointer inside `Buffer` are memory regions returned by global memory
/// allocator, those regions can't have overlap.
/// - The actual used range of `ArrayRef`s inside `RecordBatch` can have overlap
/// or reuse the same `Buffer`. For example: taking a slice from `Array`.
///
/// Example:
/// For a `RecordBatch` with two columns: `col1` and `col2`, two columns are pointing
/// to a sub-region of the same buffer.
///
/// [xxxxxxxxxxxxxxxxxxx] <--- buffer
///       ^    ^  ^    ^
/// 	  |    |  |    |
/// col1->[    ]  |    |    
/// col2--------->[    ]
///
/// In the above case, `get_record_batch_memory_size` will return the size of
/// the buffer, instead of the sum of `col1` and `col2`'s actual memory size.
///
/// Note: Current `RecordBatch`.get_array_memory_size()` will double count the
/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// https://github.com/apache/arrow-rs/issues/6439
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
...
}

This function is used for spilled execution to estimate physical memory usage, this overestimation caused many bugs in memory-limited sort/aggregation/join. For example, if there is a RecordBatch with 10 columns, all of 10 columns are sharing the same Buffer, then record_batch.get_array_memory_size() will return a 10X estimation, to make memory-limited query fail quite easily.

I believe #13089 is caused by this issue, and likely #9417 #10511 #12136 #11390

What changes are included in this PR?

Introduced a new get_record_batch_memory_size() to avoid double count, by using a internal HashSet to recognize reused buffers.
While @waynexia is working on a comprehensive solution in arrow-rs apache/arrow-rs#6439, I think it's useful to introduce this temporary fix in DataFusion due to:

  • After fixing record_batch.get_array_memory_size() with memory overcounting, it's non trivial to fix all tests at once (manual memory tracking is tricky, when I was trying to make one external aggregate query to run, it took me a while to figure out why one test case fail after a change)
  • If we can adopt this temporary fix, we can gradually swap out record_batch.get_array_memory_size(), and add regression tests for memory-limited query bugs. After we have a fix in arrow, the temporary fix function can be deprecated and replace with the origin one more easily.

Are these changes tested?

Yes

Are there any user-facing changes?

No

@github-actions github-actions bot added physical-expr Physical Expressions core Core DataFusion crate labels Nov 12, 2024
@blaginin
Copy link
Contributor

blaginin commented Nov 12, 2024

This PR does indeed fix #10511 😀. I just tested the branch, and the code that crashes in main works perfectly here

Comment on lines +173 to +176
// Count all children `ArrayData` recursively
for child in array_data.child_data() {
count_array_data_memory_size(child, counted_buffers, total_size);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use #[recursive] to protect from cases with large nested data types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've learned something new today.
Maybe apache/datafusion-sqlparser-rs#984 can be fixed with this attribute.
But this attribute come with performance overhead 🤔 https://docs.rs/recursive/latest/recursive/ I think stack overflow will happen after 10s of layers of recursion, which is likely for expression but I am not sure arrays can also have such deep nesting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree we don't need to annoate all recursive function calls -- only the ones that will become very large/deep

counted_buffers: &mut HashSet<NonNull<u8>>,
total_size: &mut usize,
) {
// Count memory usage for `array_data`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but you can probably add size of array_data.data_type itself

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this approach also missed several other metadata's memory size (like datatype, buffer pointers), they will be included in the more-comprehensive fix in arrow side.
For memory counting in large memory consumer, it's allowed to have certain inaccuracy, as long as major consumption is counted. However I agree this should be better documented.

/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// <https://github.com/apache/arrow-rs/issues/6439>
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in TopK, RecordBatchStore still uses get_array_memory_size, do you think we should switch to get_record_batch_memory_size there as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think they should all be changed, however after changing them in TopK, some existing test cases might be tricky to fix, and more end-to-end tests should be added. So I plan to do it incrementally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- can you possibly file a ticket to track any work that you know about? I can help file it / with the explanation as well


let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 8320);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

I think this line isn't covered, because I commented it out and all tests in this file passed. Let's add one more test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed.
Also I believe there are some tools can automatically do similar checks (mutate code and make sure some test case will fail, if don't then there is some issue with test coverage), like https://mutants.rs/
We can investigate how to integrate them into the project 😄

.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 60);
Copy link
Contributor

@blaginin blaginin Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only concern with this PR is that the result of get_record_batch_memory_size differs from get_array_memory_size. For example, here batch.get_array_memory_size() would return 252 instead of 60.

This could be dangerous because the project would end up with two different methods of calculating memory sizes. I can imagine a scenario in the future, where we reserve memory based on one calculation method and shrink it using the result from the other. While the difference may not be large each time, over many repetitions or a large dataset, it could behave almost like a memory leak (but without actual memory), making debugging very challenging...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we completely switch to the new method, blocking the usage of the old one? Should we try to make two numbers match closely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point. I also feel that this manual memory accounting is complex and error-prone. We’d better change all of it. (Maybe also use some RAII in the implementation, instead of manually growing and shrinking memory usage as we’re doing right now.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finding a way to automatically update the memory accounting is certainly a good idea in my mind. As we have mentioned, I think the most important thing will be to find a way to account for arrow buffers completely Then we can work it into DataFusion

@alamb
Copy link
Contributor

alamb commented Nov 13, 2024

Thanks @2010YOUY01 -- will look at this later today or tomorrow


// Merge operation needs extra memory to do row conversion, so make the
// memory limit larger.
let mem_limit = partition_size * 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be introduced as DataFusion parameter so the user can configure the memory allocation realm. I got some feeling the mem is data dependent, depending on datatypes/data being processed

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really love the documentation, so no need to go through the code.

One thing to mentioned is how fast this method is? as I believe the method will be called frequently

@2010YOUY01
Copy link
Contributor Author

One thing to mentioned is how fast this method is? as I believe the method will be called frequently

This is a very good point, I think when doing the same fix at arrow side, we should cache the result inside RecordBatch (if they're immutable)
I will run some benchmarks

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 and @blaginin -- this PR makes a lot of sense to me

Filing follow on tickets would be a good idea in my mind

@@ -109,10 +111,80 @@ pub fn spill_record_batch_by_size(
Ok(())
}

/// Calculate total used memory of this batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 for this comment

/// buffer memory size if multiple arrays within the batch are sharing the same
/// `Buffer`. This method provides temporary fix until the issue is resolved:
/// <https://github.com/apache/arrow-rs/issues/6439>
pub fn get_record_batch_memory_size(batch: &RecordBatch) -> usize {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool -- can you possibly file a ticket to track any work that you know about? I can help file it / with the explanation as well

Comment on lines +173 to +176
// Count all children `ArrayData` recursively
for child in array_data.child_data() {
count_array_data_memory_size(child, counted_buffers, total_size);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree we don't need to annoate all recursive function calls -- only the ones that will become very large/deep

.unwrap();

let size = get_record_batch_memory_size(&batch);
assert_eq!(size, 60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finding a way to automatically update the memory accounting is certainly a good idea in my mind. As we have mentioned, I think the most important thing will be to find a way to account for arrow buffers completely Then we can work it into DataFusion

let slice1 = original.slice(0, 3);
let slice2 = original.slice(2, 3);

let batch =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯

.unwrap();

let size = get_record_batch_memory_size(&batch);
// The size should only count the shared buffer once
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good in my mind to change this test so that rather than testing a hard coded size, it would compute the size of a single slice and verify that is the same

that way the test would verify the actual invariant (that the sizes are the same) rather than relying on keeping the two values in sync

@2010YOUY01
Copy link
Contributor Author

Thank you all for the feedbacks! I've updated the followings:

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @2010YOUY01

@comphead comphead merged commit 172cf8d into apache:main Nov 15, 2024
25 checks passed
@2010YOUY01 2010YOUY01 deleted the fix-batch-size branch November 16, 2024 02:51
findepi pushed a commit to sdf-labs/arrow-datafusion that referenced this pull request Nov 25, 2024
* Fix record batch memory size double counting
findepi pushed a commit to sdf-labs/arrow-datafusion that referenced this pull request Nov 25, 2024
* Fix record batch memory size double counting
findepi pushed a commit to sdf-labs/arrow-datafusion that referenced this pull request Nov 26, 2024
* Fix record batch memory size double counting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

External aggregation reserves more memory than actual usage
4 participants