Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FFI support for versions and alternate tokio runtimes #13937

Merged
merged 19 commits into from
Feb 1, 2025

Conversation

timsaucer
Copy link
Contributor

@timsaucer timsaucer commented Dec 29, 2024

Which issue does this PR close?

Rationale for this change

During testing with delta-kernel-rs and datafusion-table-providers it was discovered that there are multiple cases where table providers are spawning processes using tokio. Since these modules may be running in a different thread than the main executable, they do not have the tokio runtime. Here we are optionally adding in a tokio runtime reference so we an enter the runtime during calls to the record batch stream.

In a recent change to DataFusion core, there has been a required change in the API for operating over the FFI boundary. With this change, it will break when someone attempts to load a module from DF43 into DF44. This PR addresses this by adding a version() function that will return the major version for which the module was compiled against. This will allow users to check for compatibility before attempting to use modules across the boundary.

What changes are included in this PR?

We add a reference to the tokio runtime that is entered during calls to the record batch stream so we can spawn async processes during execution without panicing.

We add a single version() function at the root of the FFI API.

We add an integration test that builds a module and loads it from another executable to test the FFI boundary. This has both a synchronous and an asynchronous table provider.

Are these changes tested?

These are tested against datafusion-python with a python wrapper around delta-kernel-rs. Additionally an integration test is added to this repository.

Are there any user-facing changes?

This exposes a new version() function in the FFI module and adds an new method for creating FFI_TableProvider with the runtime. The existing From<> implementation still remains if the user is not spawning via tokio.

Comment on lines 29 to 36
/// Returns the major version of the FFI implementation. If the API evolves,
/// we use the major version to identify compatibility over the unsafe
/// boundary.
pub extern "C" fn version() -> u64 {
let version_str = env!("CARGO_PKG_VERSION");
let version = semver::Version::parse(version_str).expect("Invalid version string");
version.major
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i dont see this used anywhere in this PR to guard against unsafe boundary; please ignore if the PR is still WIP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding an additional note in the description that this function call is intended to be used by projects that are exposing libraries over FFI. Since that may be done in a couple of different ways such as either dynamic linking of a library or jumping through python, it's up to the downstream users to make the calls to this function and see if they are compatible. Also, since it is possible that the FFI bindings do not evolve over some datafusion versions you may have a case where multiple versions are compatible and don't require the same major version.

@alamb
Copy link
Contributor

alamb commented Jan 16, 2025

Is this PR ready for review @timsaucer ? I noticed a few error but I didn't look carefully at them yet (I figured I would ask you fiirst)

@timsaucer
Copy link
Contributor Author

Yes, it's basically ready. I just need to fix the fact that the path built in CI for the integration test differs from what builds locally on my machine. I'm taking a look now.

@timsaucer
Copy link
Contributor Author

Ok, it looks like the ci profile wasn't supported in the utility that looks for the libraries so I had to write a small custom function to find it for CI. Hopefully CI will go through and then this is ready to review

@timsaucer timsaucer force-pushed the feat/ffi_enter_tokio_runtime branch from d2e6287 to ef5e8b6 Compare January 17, 2025 00:23
@timsaucer
Copy link
Contributor Author

@alamb Ready for review.

@alamb
Copy link
Contributor

alamb commented Jan 18, 2025

Starting to check this one out

@alamb alamb mentioned this pull request Jan 18, 2025
32 tasks
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @timsaucer -- the version call makes a lot of sense. I am not sure exactly about the multiple thread pools thing (starting the other runtime makes sense, but not what you are doing with it)

Also, I think it would be nice to avoid another crate if possible

// specific language governing permissions and limitations
// under the License.

use std::{any::Any, fmt::Debug, sync::Arc};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we have to make this its own crate? Maybe it could just be a test binary in the datafusion-ffi crate

That would mean putting it in datafusion/ffi/tests/ffi_integration.rs or something
And then it would get run via

cargo test --test ffi_integration

Or

cargo test -p datafusion-ffi --tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've moved this as requested. I had to do a little CI work to make it so these integration tests do not build except for testing. I ended up putting a feature flag on it instead.

sync::{broadcast, mpsc},
};

use crate::create_record_batch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest putting a one line comment at the top of this file with the context of what it is being used for

Something like

Suggested change
use crate::create_record_batch;
use crate::create_record_batch;
//! Demonstrates how to use a tokio threadpool to run async functions across the ffi boundary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also help to give some overview context. Somthing like

//! we can't share a tokio runtime across the ffi boundary so 
//! in order to run `async` functions in a table provider called from
//! ffi, the runtime must be supplied.
//! 
//! In this example, we create a new runtime that executes on a newly created
//! thread.

datafusion/ffitest/src/async_provider.rs Outdated Show resolved Hide resolved
}
});

let _ = shutdown.blocking_recv();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is called until the batch_sender returns an error (aka the receiver is dropped)

THus I don't think this shutdown will ever be called.

To properly be able to cancel this sender, I think you would have to join! the call on batch_sender.send and shutdown.recv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the AsyncTableProvider goes out of scope because the execution plan is complete, it sends a signal on the shutdown channel. I implemented the Drop trait to make this happen. That causes the spawned thread to exit.


#[allow(clippy::disallowed_methods)]
tokio::spawn(async move {
// Nothing to do. We just need to simulate an async
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this -- normally we wouldn't spawn a task from poll_next

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This came up in two places when I was testing table providers in datafusion-python, one of the table providers in the datafusion-contrib and in a python wrapper I tried adding to delta-kernel adapted from spice.ai's github repo. In both cases during the stream calls there are calls to tokio::spawn occurring. When you do those as rust dependencies, it's not a problem because you are within the same runtime.

Does that make sense? I can track down the exact calls where I came across this. It's the primary impetus behind this PR.

}
}

impl Stream for AsyncTestRecordBatchStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this -- what are you trying to do? Show how to run a async function from the remote stream?

I think the "better" way to do this is via methods in the futures crate to create a stream.

Here is an example:

https://github.com/goldmedal/datafusion-llm-function/pull/1/files#diff-b6171a786c2996787af50100adbbc2f85d81b9ef9088b2688e5c03ea1eee7edeR143

If you stub out an example function you are trying to adapt I can try and help with the futures-fu?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really trying to use this as a demonstration of the best way to use it, but as a unit test to reproduce the same problems I ran into using some of the table providers (see comment above)

@github-actions github-actions bot added the development-process Related to development process of DataFusion label Jan 25, 2025
@timsaucer
Copy link
Contributor Author

CI repaired after moving the crate out. @kevinjqliu would you be able to review?

Copy link

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generally LGTM

I built and ran this PR locally and ran against iceberg-rust tests using TableProvider.

To run build locally (until the PR is merged):
- in `datafusion`, fetch the latest changes from `feat/ffi_enter_tokio_runtime` branch
- in `datafusion-python`, change package to reference latest datafusion* repos and build locally
    - `uv run --no-project maturin build`
    - check target `ll target/wheels/datafusion-44.0.0-cp38-abi3-macosx_11_0_arm64.whl`
- in `iceberg-rust`, build `bindings/python` and run tests
    - `cd bindings/python`
    - `hatch run dev:develop`
    - `hatch run dev:test`

.github/workflows/rust.yml Outdated Show resolved Hide resolved
@alamb alamb changed the title Feat/ffi enter tokio runtime FFI support for versions and alternate tokio runtimes Jan 30, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @timsaucer and @kevinjqliu -- I think this one looks good to go. Really well commented and tested, as always 🙏

}

struct AsyncTestRecordBatchStream {
batch_request: mpsc::Sender<bool>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think you could use a tokio::stream adapter here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let mut this = self.as_mut();

#[allow(clippy::disallowed_methods)]
tokio::spawn(async move {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kk

@alamb
Copy link
Contributor

alamb commented Jan 30, 2025

@kevinjqliu
Copy link

CI is green now! Thanks for the fix and thanks @alamb for the review :)

@timsaucer
Copy link
Contributor Author

I'm still seeing one documentation test failure. I'm tweaking it, but it doesn't reproduce on my local machine so it's taking a while to work through

@kevinjqliu
Copy link

All checks have passed
27 successful checks

woot! thanks for following up

@timsaucer timsaucer merged commit 9d1bfc1 into apache:main Feb 1, 2025
27 checks passed
@timsaucer timsaucer deleted the feat/ffi_enter_tokio_runtime branch February 1, 2025 01:02
cj-zhukov pushed a commit to cj-zhukov/datafusion that referenced this pull request Feb 3, 2025
* Add optional reference to tokio runtime for table providers

* Add function to return the library version over FFI

* Resolve clippy warnings

* Function does not need to be defined as unsafe

* Add integration test for FFI table provider

* Add version call on FFI integration test

* Making use explicit on crate to try to get CI to ensure it builds first

* Add license text

* Fix unit test to find deps in ci profile

* Remove ffitest crate and put test lib behind a feature flag

* Add integation-tests feature to ci tests

* Add integration-tests feature to CI run

* Add clarifying text

* Update CI to only run integration tests for certain checks

* When the feature integtation-tests is enabled, we get conflicting library entries for the example table provider and integration test, so disable the example during CI run

* Remove typo

* Specify each excluded crate separately

* Doc tests did not need the exclusion

* Integration tests shouldn't need doc test
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
development-process Related to development process of DataFusion
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FFI Execution Plans that spawn threads panic Add version checking to FFI crate
3 participants