Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IoObjectStore that uses main runtime for network requests #248

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

matthewmturner
Copy link
Collaborator

@matthewmturner matthewmturner commented Nov 19, 2024

Add's a wrapper ObjectStore that uses spawn_io / the main tokio runtime for making network requests instead of the DedicatedExecutor which should only be used for CPU bound work.

@matthewmturner matthewmturner requested a review from alamb November 19, 2024 15:13
@matthewmturner
Copy link
Collaborator Author

@alamb FYI

Comment on lines 34 to 38
async fn get(&self, location: &Path) -> Result<GetResult> {
let location = location.clone();
let store = self.inner.clone();
spawn_io(async move { store.get(&location).await }).await
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will be following this approach for all ObjectStore methods

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES! This is very cool

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect -- this is great @matthewmturner -- I was thinking about writing just such a thing to add to DataFusion. If you get it in here I very much plan to upstream it (will try and do so tomorrow)

@matthewmturner matthewmturner marked this pull request as ready for review November 20, 2024 15:55
@matthewmturner
Copy link
Collaborator Author

Just want to add a test for this then plan to merge it

@alamb
Copy link
Contributor

alamb commented Nov 20, 2024

Just want to add a test for this then plan to merge it

I am making great progress on the example

I als plan to yoink this code too

@matthewmturner
Copy link
Collaborator Author

matthewmturner commented Nov 20, 2024

@alamb do you envision having the dedicated executor eventually live in the main datafusion repo? If not (or even as an interim solution while its iterated on) I could imagine having a library for dft that exports the DedicatedExecutor and IoObjectStore so others can more easily use it.

I.e. an example is great to help understand and see how this part of execution works - but exposing the functionality directly should make it easier to just pick up and use.

@alamb
Copy link
Contributor

alamb commented Nov 20, 2024

@alamb do you envision having the dedicated executor eventually live in the main datafusion repo? If not (or even as an interim solution while its iterated on) I could imagine having a library for dft that exports the DedicatedExecutor and IoObjectStore so others can more easily use it.

I envision DedicatedExecutor and IoObjectStore being directly in DataFusion (I can't really write the example without them :) )

Stay tuned 📺

@tustvold
Copy link

This is problematic, as the returned GetResult contains a stream that will then "migrate" across the runtime boundary. It is at best undocumented what the implications of this are. Other issues will be found with things like put_multipart and friends.

There is some further context on here - apache/arrow-rs#4040 (comment)

I would strongly encourage implementing the IO dispatch at an application boundary that makes, e.g. AsyncFileReader, the ObjectStore trait is not designed to allow it to be shimmed in this way.

@alamb
Copy link
Contributor

alamb commented Nov 20, 2024

I would strongly encourage implementing the IO dispatch at an application boundary that makes, e.g. AsyncFileReader, the ObjectStore trait is not designed to allow it to be shimmed in this way.

I don't fully understand this - perhaps we can discuss in the context of my example. I am pretty close to having something to show for discussion

Specifically my vision is to have an example that shows how to run a DataFUsion plan with CPU on one threadpool and IO on another.

@tustvold
Copy link

tustvold commented Nov 20, 2024

I don't fully understand this - perhaps we can discuss in the context of my example

Rather than wrapping the ObjectStore trait, you would wrap the calls within DF that interact with ObjectStore (or any other interface that performs IO), e.g. AsyncFileReader, the internal methods of ListingTable, etc... The problem arises because a number of the APIs exposed by ObjectStore are exposed as streams, in some cases, e.g. GetResult the behaviour of tokio may sort of work (apache/arrow-rs#4040 (comment)), but others like list will simply not work correctly - polling the stream will potentially initiate a new request on the calling runtime.

Ultimately DF needs to separate IO based activities from CPU-based activities, and overloading this onto the ObjectStore interface won't work reliably.

@alamb
Copy link
Contributor

alamb commented Nov 20, 2024

Ultimately DF needs to separate IO based activities from CPU-based activities, and overloading this onto the ObjectStore interface won't work reliably.

Thank you -- I have some ideas about this. Will keep working (I am thinking of some way to move the work of one stream to another runtime)

@matthewmturner
Copy link
Collaborator Author

@alamb perhaps this is relevant https://stackoverflow.com/a/78094264 (answer from Alice Rhyl)

@tustvold
Copy link

tustvold commented Nov 20, 2024

Or you could just spawn IO at a non-streaming application boundary... This is simpler and much easier to reason about... But I guess I am now repeating myself...

Edit: TBC the TCPStream is deep in the guts of our HTTP client, there aren't APIs that would allow us to move it, nor would it make sense given the whole purpose is to keep IO off the DF threads

@matthewmturner
Copy link
Collaborator Author

thanks @tustvold, appreciate your insight on this. need to digest implications of this a bit more to figure out next steps. It's not immediately clear to me if interfaces like AsyncFileReader are tied to parquet (i believe thats just in the parquet crate - im hoping this will work in a way that is agnostic to underlying file type. I havent had the time to look much into this yet though so maybe i just need to spend more time with it.

@tustvold
Copy link

im hoping this will work in a way that is agnostic to underlying file type

Parquet will likely need to be handled separately from CSV/JSON/Avro because of the very different IO pattern, but the IO for those three should be similar enough to allow sharing. It has been long time since I worked on it, but I at least remember extracting a common FileStream trait or something similar. I'd hope something similar would be possible for the write side.

@matthewmturner
Copy link
Collaborator Author

i am going to mark this as draft for the time being while these more fundamental issues are resolved.

@matthewmturner matthewmturner marked this pull request as draft November 21, 2024 14:10
@alamb
Copy link
Contributor

alamb commented Nov 21, 2024

My hope/plan is to figure out a good way to properly separate IO/Compute in the context of

I hope to have a chance to work on it more tomorrow / over the weekend

@djanderson
Copy link

I'm super interested in the outcome here. I've got a production flight server that I'm trying to replace w/ flight sql + DataFusion, but I keep hitting executor starvation issue and have been tracking the conversation around dedicated runtimes closely. I ported DedicatedExecutor and CrossRtStream from Influx last week (good timing I know) and tried the IoObjectStore approach yesterday since it was super easy to hack in. Results are better but I am still hitting starvation when data transfer passes a couple hundred MB.

That is all to say, if I can help test any approaches, lmk.

@matthewmturner
Copy link
Collaborator Author

@djanderson just to confirm i understand - is the dedicated executor being starved / the IO it needs to perform cant proceed? or the main runtime is being starved?

@djanderson
Copy link

I believe that the main runtime is being starved because it's manifesting itself as dropped network connections, either between MinIO and the flight sql server (more often) or between the flight sql server and the python client I'm using to test.

That said I just got tokio-console set up and I'm still learning how to read it. Hopefully I can use it to make sure that I'm in fact getting all my IO running on the main thread via spawn_io and didn't miss some place.

@rohitrastogi
Copy link

rohitrastogi commented Dec 5, 2024

Any updates, @djanderson? I haven't found an easy way to determine which tasks ran on which runtime in the console. I also took a similar approach to using the DedicatedExecutor at the ObjectStore boundary in my project, but still see periodic S3 timeouts a la: apache/arrow-rs#5882

@alamb, do you have any suggestions on the number of threads to allocate for cpu bound tasks vs IO bound ones relative to number of available cores? I will try lowering the priority of the threads in the CPU runtime to see if that makes a difference.

@alamb
Copy link
Contributor

alamb commented Dec 5, 2024

@alamb, do you have any suggestions on the number of threads to allocate for cpu bound tasks vs IO bound ones relative to number of available cores? I will try lowering the priority of the threads in the CPU runtime to see if that makes a difference.

I recommend N-1 cores for the CPU bound tasks (so there is always a full core available to do network requests)

Also, it is important that you actually run the stream in the separate runtime.

I have sketched this out in the different_runtime_advanced example here:

https://github.com/apache/datafusion/pull/13424/files#diff-fde0efd9753d1a0d0ebbfb605816c4ea770aa924896bac7ceecdeed2934e491aR146-R211

I am still playing around with making it easier to use the DedicatedExecutor paradigm, which is why I don't have that PR up for review yet

@alamb
Copy link
Contributor

alamb commented Dec 5, 2024

FWIW @adriangb reports success using this procedure here:

@djanderson
Copy link

@rohitrastogi no updates from me, still also seeing the periodic timeouts and also didn't get much insight via tokio-console.

What I'm doing right now is trying to implement some of the file range caching ideas from https://blog.haoxp.xyz/posts/caching-datafusion/, in the hopes that if i cache part of the file before the failure a retry might get me over the line. Still keeping a close eye on this work, appreciate everyone's efforts!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants