Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refacto providers store #1

Merged
merged 17 commits into from
Oct 6, 2021
Merged

Conversation

rdettai
Copy link
Owner

@rdettai rdettai commented Sep 24, 2021

Rationale for this change

Add ObjectStore support to apache#1010

What changes are included in this PR?

  • Include the ObjectStore abstractions to the new implementations of FileFormat, TableProvider and ExecutionPlan
  • changes the object store API:
    • adding ObjectReader.sync_chunk_reader to be compatible with the Parquet ChunkReader trait
    • ObjectReader.sync_chunk_reader returns Read instead of AsyncRead to accomodate input types of CSV, json, avro and Parquet reader APIs
    • keeping ObjectReader.chunk_reader in prevision to the new async reader API
    • ObjectStore.file_reader() now takes SizedFile instead of FileMeta because only path and size are required there
  • removing useless Seek trait bound in avro_to_arrow
  • Added some tests to better cover the limit behaviour at the file_format level

Are there any user-facing changes?

ObjectStore is user-facing but probably not used by any users yet.

@rdettai
Copy link
Owner Author

rdettai commented Sep 24, 2021

@alamb @houqp @yjshen apart from cosmetic changes, the biggest constraint here is that I had to transform

async fn chunk_reader(&self, start: u64, length: usize) -> Result<Arc<dyn AsyncRead>>;

to

fn chunk_reader(&self, start: u64, length: usize) -> Result<Box<dyn Read + Send + Sync>>;

I would argue that:

  • Changing Arc to Box was expected, Read requires &mut which is not really compatible with Arc
  • Making the method sync and replacing AsyncRead with Read is really not want we would want here. But we are currently limited by the underlying file format readers (CSV, JSON, Parquet...) that all require Read. Note that this is the kind of usecase that cloudfuse-io/cloud-readers-rs is meant to solve.

@rdettai
Copy link
Owner Author

rdettai commented Sep 24, 2021

I see a possible performance impact because of the dynamic dispatch on the reader (read() might be called a huge amount of times). It might be a bit hard to solve though. Maybe something like enum_dispatch could help out, but I feat that it would make the API annoying. Any thoughts?

@rdettai
Copy link
Owner Author

rdettai commented Sep 24, 2021

I am wondering if the ObjectStoreRegistry shouldn't be moved out of the FileFormat. Maybe the logical plan should ignore this registry entirely and we should wait until the scan() operation to resolve it. This will make it more obvious in systems like Ballista that we cannot resolve paths on the client, we need to do it from the Scheduler (upon conversion from logical plan to physical plan or in specific endpoints that resolve the schema).

@alamb
Copy link

alamb commented Sep 24, 2021

Will try and review this later today or tomorrow

Copy link

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really cool to see object store in action! Left two minor comments :)

/// Path of the file
pub path: String,
/// Last time the file was modified in UTC
pub last_modified: Option<chrono::DateTime<Utc>>,
Copy link

@houqp houqp Sep 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what's the reason for dropping this optional field? It is an attribute that delta lake table provider would need to rely on. Perhaps other table format might rely on it as well.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that, just an oversight. I'll look how it is used exactly and add it back with some comments 🙂

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to understand what last_modified is used for exactly in the delta reader, but I couldn't quite get it... According to my understanding, the modification dates that should be used are sourced from the catalog (for example this log file), not the object store.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main use-case in delta is to get the timestamp for a specific verison commit. The commit entry you linked contains timestamp metadata for each individual action, but not the full commit/version. The object/commit timestamp is currently only used in one place: time travel. Here is the relevant code: https://github.com/delta-io/delta-rs/blob/e52f23fd3ac68b86aededff081dffcd86902ecf8/rust/src/delta.rs#L809. The idea being we can leverage object list and head API calls to perform version filtering by timestamp efficiently without having to fetch and parse the object content.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @houqp for these pointers, I understand better now. I am really surprised by that behavior 😃. This basically means you cannot copy a delta table around (delta-io/delta#192). I will add back this field right away with a pointer to this issue.

Out of curiosity. I can't find any reference to the usage of the file system / object store timestamp in the Delta documentation or the protocol. Have I missed something? If I had designed this, I would have definitively encoded the timestamp in the log entry key (something like 00000000000000000000-1615043767813.json) 😄

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point, yeah, this design is a bit hacky :D This behavior is not documented in the official doc nor the protocol spec, I think I probably got it from reading the scala reference implementation.

I think the reason they went with version.json instead of version-ts.json was so that they can easily fetch version commit object directly with get api call without having to do a list api call first. But I agree with you that it's not ideal to depend on object modification time in this particular design, tracking the version timestamp in a separate managed metadata folder would have been better. For example, a version timestamp checkpoint file or some sort.

let fmeta = fmeta_res?;
let reader = self
.object_store_registry
.get_by_uri(&fmeta.path)?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that SizedFileStream is returned from object store list call, I believe their paths would not contain uri scheme, so get_by_uri will always return the local file store? The naming convention that could be useful to establish within the code base would be using uri for string that could contain scheme while path for relative path component from the uri, i.e. without scheme.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point. It wasn't clear for me that the returned values from list calls wouldn't be URIs. I'll try to make that clearer.

@yjshen
Copy link

yjshen commented Sep 26, 2021

Thanks for working on this! Will start my review tomorrow morning.

I feel a step backward by making chunk_reader return Read instead of AsyncRead. I know that all our current readers rely on Read as you mentioned. How about changing them to AsyncRead one by one as suggested by @alamb in apache/arrow-rs#111?

Will https://github.com/cloudfuse-io/cloud-readers-rs make extra benefits that outweigh the awkward async->sync->async transition?

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very cool @rdettai . Thank you for the PR.

I think the the core problem we face is that the CsvReader, ParquetReader, etc are not (yet) async

I agree with @yjshen that making chunk_reader sync seems like a step backwards as it would require changing again once we do have proper async readers.

One reason that the structure described in https://github.com/cloudfuse-io/cloud-readers-rs (great pictures, btw) may not be ideal is that I think it will end up blocking a thread on the tokio threadpool (waiting on the mutex) while data is fetched from the network.

I suspect any use of cloud-readers-rs would require at least 2 threads in the tokio threadpool to avoid deadlocks

An alternative strategy might be some "bridging" code between now and when we get async versions:

  1. Add a function like pub fn get_local_file(filepath: SizedFile) -> File type function to the ObjectStore
  2. The local file store just passes along the underlying file
  3. remote ObjectStore's could read the data from object to some temporary file

Once arrow has a proper async ParquetReader, CSV Reader, etc we could change the DataFusion implementations to use that

Using a temporary file strategy is a different tradeoff than cloud-reader-rs -- it would potentially require more network (S3 bytes) but avoid a potential deadlock.

BTW the "buffer to temporary files" strategy is effectively the approach we use in IOx to read parquet files from S3 source link


#[cfg(feature = "avro")]
/// Read Avro schema given a reader
pub fn read_avro_schema_from_reader<R: Read + Seek>(reader: &mut R) -> Result<Schema> {
pub fn read_avro_schema_from_reader<R: Read>(reader: &mut R) -> Result<Schema> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 nice cleanup (and makes this code easier to use)

@@ -35,25 +35,69 @@ use crate::physical_plan::Statistics;

/// Character Separated Value `FileFormat` implementation.
pub struct CsvFormat {
has_header: bool,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc strings seem to have been lost from this structure

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the docstrings are useful on public fields only: private field documentation will not be used by the HTML generator anyway, they appear as /* fields omitted */ (exemple here). This is why I moved the documentation to the public with_xxx builder methods. Is there a more idiomatic way to do this? Should we leave these fields public?

@rdettai
Copy link
Owner Author

rdettai commented Sep 26, 2021

Hi all thanks for your comments! I think this is still work in progress and should require some thoughts. Sadly I will have limited bandwidth this week, so I will only pick it up again next week!

@alamb indeed to avoid blocking the tokio scheduler we need to wrap all blocking calls with spawn_blocking.

@houqp
Copy link

houqp commented Sep 26, 2021

An alternative to reduce the scope of the PR is to introduce a set of new sync methods to the object store trait instead of replacing the existing one, e.g. sync_chunk_reader. Then we can keep using the sync version in our existing code base and switch to the native async version when have async io added to arrrow-rs for parquet. I think making file format async might be something that deserves its own set of PRs even outside of apache#1010.

If we are to release our object store abstraction as a reusable separate crate to the rest of the Rust community in the future, which I hope we will :), I think having both sync and async trait methods will make it easier to use as well.

@alamb
Copy link

alamb commented Sep 27, 2021

If we are to release our object store abstraction as a reusable separate crate to the rest of the Rust community in the future, which I hope we will :), I think having both sync and async trait methods will make it easier to use as well.

I agree -- this would be very cool.

@rdettai
Copy link
Owner Author

rdettai commented Sep 30, 2021

Sorry again for the lack of bandwidth to push this forward this week, and more importantly, thanks again for your precious review time 😄

introduce a set of new sync methods to the object store trait instead of replacing the existing one

I agree that we might need an abstraction that is capable of both at some point in time, and I fear that it will be very hard to maintain it 😧. I feel it will create a sort of compatibility matrix between object stores and datasources:

  • cloud stores will be harder to implement in a sync way, file system might be more performant if left sync (managing the threadpool ourselves instead of letting tokio do it),
  • datasources will partly use the sync interface and the async interface.

If we introduce this dual interface right now, we will have half unused and unimplemented methods hanging around for a longer time. Are you already relying on the ObjectStore abstraction in external crates? In some currently opened pull request? Otherwise, my suggestion would be to make the ObjectStore "sync only" for now (I don't mind renaming it as sync_chunk_reader) and add back the async method once the async arrow parquet/csv/json readers are ready and we start migrating to them... But if you really prefer leaving the async method, unimplemented and unused, I can also accommodate that 👍

@houqp
Copy link

houqp commented Oct 1, 2021

Thanks @rdettai for the link to the fs async performance problem, I was not aware of this leaky abstraction before :)

I fear that it will be very hard to maintain it

I share the same concern with you. Just to be clear, I think within datafusion, we should just use async everywhere, I was thinking the sync interface could be left to other projects that really need sync interfaces. In the short run, we will have a mix of sync and async in datafusion's datasource module. I feel like that's unavoidable as we will be migrating parquet/csv/json readers to async one at a time. Or it's also possible that we will be able to merge the arrow2 branch soon, which means we get async support for all format readers in one go :)

Are you already relying on the ObjectStore abstraction in external crates? In some currently opened pull request?

I don't think there are a lot, but I am aware of @yjshen 's native spark executor work, which depends on an async hdfs object store: https://github.com/yjshen/hdfs-native/blob/605ef1d29ee5a216865d5905d668498b4387dcd4/src/hdfs_store.rs#L203. That said, given the current stage of the project, I am not that concerned about introducing breaking changes as long as those changes are justified.

I would vote for us keeping async here for the following two reasons:

  1. @yjshen is has plans to leverage this for this native spark executor work soon, as in weeks, so we might as well keep it here.
  2. We know we will be using async as the API in datafusion in the long run, so keeping it here now reduces code churn.

@rdettai rdettai force-pushed the refacto-providers branch from f85f66e to 4d92231 Compare October 4, 2021 16:02
@rdettai rdettai force-pushed the refacto-providers-store branch from 042a33b to bc82996 Compare October 4, 2021 16:02
@rdettai
Copy link
Owner Author

rdettai commented Oct 5, 2021

@alamb @yjshen @houqp I have made some changes to accommodate your comments and improve the design a bit. I am sorry if there is a bit of forth and back in the commits, I had a lot of hesitations. Most important changes since your last reviews:

  • I have removed the object store reference from the FileFormat. This will help the serde of the logical plan. I currently cache it in the ListingTable provider, but I think it should be provided when scan() is called instead. I just don't want to change the TableProvider trait for now as it implies a lot of refacto. I would prefer to first merge this back into the original PR.
  • Similarly, I would like to also take the object store reference out from the ExecutionPlan trait object structs, to make them more easily serializable. What do you think about adding ObjectStoreRegistry as a parameter to ExecutionPlan.execute() ?

@rdettai
Copy link
Owner Author

rdettai commented Oct 5, 2021

If there is no major feedback, I think I will merge this to apache#1010 tomorrow.

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say I reviewed this entire PR, but I looked carefully at how ListingTable works as the ParquetFormat -- and I really like the structure and how it is coming together -- the split of infer_schema and infer_stats is very nice 👌 👨‍🍳

So all in all I would say this looks good to merge to your other branch


use super::object_store::{FileMeta, ObjectReader, ObjectReaderStream, ObjectStore};

/// The configurations to be passed when creating a physical plan for
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a neat idea.

Is the idea that this information will be provided by the DataSource (e.g. the ListingTable)?

If so I think that makes sense as things like the statistics and schema may be known via some other cache / catalog mechanism

/// be analysed up to a given number of records or files (as specified in the
/// format config) then give the estimated common schema. This might fail if
/// the files have schemas that cannot be merged.
async fn infer_schema(&self, readers: ObjectReaderStream) -> Result<SchemaRef>;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb
Copy link

alamb commented Oct 5, 2021

I have removed the object store reference from the FileFormat. This will help the serde of the logical plan. I currently cache it in the ListingTable provider, but I think it should be provided when scan() is called instead. I just don't want to change the TableProvider trait for now as it implies a lot of refacto. I would prefer to first merge this back into the original PR.

Sounds like a good plan to me

Similarly, I would like to also take the object store reference out from the ExecutionPlan trait object structs, to make them more easily serializable. What do you think about adding ObjectStoreRegistry as a parameter to ExecutionPlan.execute() ?

I think adding a ObjectStoreRegistry to ExecutionPlan.execute doesn't make sense to me as many of the ExecutionPlans don't do any object store operations at all (e.g. LimitExec or SortExec).

I would make the most sense to me to add a ObjectStoreRegistry to functions like ListingTable::new() -- though I understand the serialization problem -- it seems like we should "simply" not serialize the ObjectStoreRegistry references and then provide a new ObjectStoreRegistry instance when deserializing in ballista -- though I realize the current code structure may make this challenging

Copy link

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks pretty solid 👍

@houqp
Copy link

houqp commented Oct 6, 2021

I would make the most sense to me to add a ObjectStoreRegistry to functions like ListingTable::new() -- though I understand the serialization problem -- it seems like we should "simply" not serialize the ObjectStoreRegistry references and then provide a new ObjectStoreRegistry instance when deserializing in ballista -- though I realize the current code structure may make this challenging

Reconstruction without serialization becomes harder when ObjectStoreRegistry can be mutated at runtime. @rdettai don't we still need to serialize the ObjectStoreRegistry even it's moved outside into an argument for the execute method? How do you envision the executors to create a matching ObjectStoreRegistry?

@alamb
Copy link

alamb commented Oct 6, 2021

How do you envision the executors to create a matching ObjectStoreRegistry?

I was imagining that whatever system started ballista executors would be responsible for setting up a compatible ObjectStoreRegistry and that it probably would not be possible to serialize the existing one.

For example using AWS S3 with instance credentials, I believe the credentials are tied to the actual VM making the request, so the specifics of what was in a configured ObjectStoreRegistry likely have to vary from node to node.

@rdettai
Copy link
Owner Author

rdettai commented Oct 6, 2021

I was imagining that whatever system started ballista executors would be responsible for setting up a compatible ObjectStoreRegistry

Yes, I was picturing something similar. Also, if you have the different store implementations in different crates, you need a way to compile them into the scheduler/executor anyway! You might as well add all the compiled implementations into the register automatically. In Spark for instance you drop the s3 client jar into the classpath, and then it automatically loads it when you use s3://. Not sure how that would play out in Rust... 😲

  • We can use dylibs, but personalty I would prefer to keep the static compilation
  • My suggestion would be to expose the registry as static for now. We can refer to that instance from anywhere, including Ballista.

@rdettai rdettai merged commit 06155d1 into refacto-providers Oct 6, 2021
@houqp
Copy link

houqp commented Oct 7, 2021

Agreed, I also think we should keep all object store crates/plugins statically compiled for all binaries within the ballista cluster for now. I don't really see the need to load a new object store implementation at runtime. With with Spark, we don't do this in production today.

For AWS specifically, I was thinking about the object store config as which credential provider to use, i.e. iam profile, instance profile, iam user keys, etc. AWS sdk's credential provider chain by default falls back to instance profile I believe if higher priority credentials are not discovered. However, one might want to use IAM roles to manage S3 across the cluster instead. In this case, we need to configure all s3 object stores to use the same IAM role. Perhaps to keep it simple for now, we can set the same convention to make the config also static. i.e. when you spin up the cluster, you are responsible for syncing the object store config across all machines.

@alamb
Copy link

alamb commented Oct 7, 2021

The usecase of (potentially) using different credentials for different plans is a good one - and I agree in that case the plan itself would need to serialize and send some amount of config.

I like the idea of starting with static config and then we can add some way to pass on per query configuration information subsequently

Perhaps this could be encoded in the object store url somehow, like

s3://my_bucket/my_prefix?IAMRole=my_awesome_role

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants