-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: optimistic transaction protocol #632
Conversation
Thank you @roeap for picking up this work! I will take a closer look at it this weekend. 👍 from me for including datafusion-expr if needed. |
The structure looks good to me 👍 When we introduce a proper expression handling abstraction, we should upgrade what we have in partition handling with it as well: delta-rs/rust/src/partitions.rs Line 12 in cc62c49
|
FYI I'm planning on reviewing this weekend. :) |
# Description This PR refactors and extends the table configuration handling. The approach is analogous to how we and object_store handle configuration via storage properties. The idea is to provide a somewhat typed layer over the untyped configuration keys. There was one surprising thing along the way. From what I can tell, we may have been omitting the `delta.` prefix on the config keys we parse. So this would definitely be breaking behaviour, since we no longer recognize keys we were parsing before. We can in principle handle aliases for keys quite easily, but I was not sure what the desired behaviour is. cc @rtyler @xianwill - This change would probably affect `kafka-delta-ingest`, so especially interested in your opinions! # Related Issue(s) part of delta-io#632 # Documentation <!--- Share links to useful documentation --->
# Description Another PR on the road to delta-io#632 - ~~keeping it a draft, as it is based on delta-io#1206~~ While the `commitInfo` action is defined as completely optional, spark and delta-rs write at the very least interesting, but often also quite helpful information into the commit info. To make it easier to work with and centralize some conventions, we introduce a `CommitInfo` struct, that exposes some of the fields at the top level. Additionally we harmonize a bit between spark and delta-rs conventions. # Related Issue(s) part of delta-io#632 # Documentation <!--- Share links to useful documentation ---> --------- Co-authored-by: Will Jones <[email protected]>
pub fn read_whole_table(&self) -> bool { | ||
match self { | ||
// TODO just adding one operation example, as currently none of the | ||
// implemented operations scan the entire table. | ||
Self::Write { predicate, .. } if predicate.is_none() => false, | ||
_ => false, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reference implementaition allows for txn to "taint" the entire table, in which case we disregrad analysing the specific actions. The conflict checker covers this behavior in tests, but I haven't investigated yet, when this is actually set. Mainly leaving this fn, as a "reminder" for subsequent PRs soon to come.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does overwrite read the entire table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or could you provide examples of which operations will?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our operations should actually never use this directly right now. I went through the spark code base, and it seems this is invoked mostly in cases when a plan querying multiple tables with spark has a delta table set as a sink.. This usually uses generic plans and does not invoke the delta operations directly, or rather when we are not too sure what actually happened.
Did not get too deep though. Personally I am a bit conflicted on how to proceed with this. on one hand it does nothing useful right now. On the other hand, we can keep it as a reminder to mindful of this. While right now I think datafusion does not yet have the concept of a sink in their plans, I believe they might be added fairly soon. At this point we may encounter situations, where a plan wants to write to delta, but we have no way of knowing what was scanned.
That said, I do hope that we will be able to inspect the plan metrics and figure out what was read anyhow. Especially since we report the scanned files in out scan operator.
&actions, | ||
DeltaOperation::FileSystemCheck {}, | ||
snapshot, | ||
// TODO pass through metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to do a pass through all our operations in the next Pr, where also I'll be focussing on testing the commit
function, and addressing these kinds of TODOs.
pub fn parse_predicate_expression(&self, expr: impl AsRef<str>) -> DeltaResult<Expr> { | ||
let dialect = &GenericDialect {}; | ||
let mut tokenizer = Tokenizer::new(dialect, expr.as_ref()); | ||
let tokens = tokenizer | ||
.tokenize() | ||
.map_err(|err| DeltaTableError::GenericError { | ||
source: Box::new(err), | ||
})?; | ||
let sql = Parser::new(dialect) | ||
.with_tokens(tokens) | ||
.parse_expr() | ||
.map_err(|err| DeltaTableError::GenericError { | ||
source: Box::new(err), | ||
})?; | ||
|
||
// TODO should we add the table name as qualifier when available? | ||
let df_schema = DFSchema::try_from_qualified_schema("", self.arrow_schema()?.as_ref())?; | ||
let context_provider = DummyContextProvider::default(); | ||
let sql_to_rel = SqlToRel::new(&context_provider); | ||
|
||
Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parsing predicate expressions is actually kind of broken. Regardless of the schema we pass in, int fields are always recognized as i64. I hope, that the abstractions in the core project will eventually help us out. Until then I plan to either do a follow-up in re-writing these expressions, or, if this is a bug, addressing this upstream in datafusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet another set of test utilities, and yet another TODO for follow up PR, to consolidate our test helpers...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks like a great improvement. I think we want to be more conservative in cases where there are multiple concurrent transactions that beat the candidate one. Also the error messages could be improved.
_ => None, | ||
} | ||
} | ||
|
||
/// Denotes if the operation reads the entire table | ||
pub fn read_whole_table(&self) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and changes_data
makes me wonder whether DeltaOperation
should instead be a trait so that we require each instance to implement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed! If OK with you, I would defer exploring that to a follow up PR. To really get a feel for how that API should look like, I would like to work a bit more with operations that actually need the conflict resolution :).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds good to me!
pub fn read_whole_table(&self) -> bool { | ||
match self { | ||
// TODO just adding one operation example, as currently none of the | ||
// implemented operations scan the entire table. | ||
Self::Write { predicate, .. } if predicate.is_none() => false, | ||
_ => false, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does overwrite read the entire table?
// add / read + no write | ||
// transaction would have read file added by concurrent txn, but does not write data, | ||
// so no real conflicting change even though data was added |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does read + no write
mean no data change? like an optimize?
version_to_read += 1; | ||
} | ||
|
||
// TODO how to handle commit info for multiple read commits? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if there are multiple versions, should we just error for now? If there are multiple commits that beat our candidate, it seems like we should generate a WinningCommitSummary for each transaction after read_version
, right? And then checking our candidate commit against each of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Changed the logic, that out commit loop will always only check against the currently conflicting commit and as such the WinningCommitSummary
will now summarize exactly one commit.
pub fn read_whole_table(&self) -> bool { | ||
match self { | ||
// TODO just adding one operation example, as currently none of the | ||
// implemented operations scan the entire table. | ||
Self::Write { predicate, .. } if predicate.is_none() => false, | ||
_ => false, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or could you provide examples of which operations will?
@wjones127 - sorry for taking a long time to incorporate your feedback. I think it's mostly done now. Mainly we now always check against the next version of commits. The questions around As I plus I re-enabled an existing test for optimise, which we can now support again. |
I am realizing we have another transaction implementation called So I think we need to consolidate those into |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good now.
The PySpark integration tests will be fixed soon. I should take a look at the sporadic failures in the Azure tests.
Yes it does. So far I have deliberately kept the implementation of commits used in the operations module separate from the existing one on the table. Where I am not sure is if we need the |
Description
This PR adds a
ConflictChecker
struct for conflict resolution in cases of concurrent commit failures. The implementation is heavily inspired by the reference implementation. So far we have most tests from spark that specifically target conflict resolution covered.Working on this I thought a bit about what we may consider going forward, as we move through the protocol versions :). In the end we could end up with three main structs that are involved in validating a commit.
DataChecker
, which validates and potentially mutates data when writing data files to disk. (Currently supports invariants)ConflictChecker
, which checks if a commit can be re-tried in case of commit conflicts.CommitChecker
, which does a-priory validation of the commit itself (e.g. append only and other rules covered by tests in spark)My hope is to get this PR merged right after we release
0.8.0
, so there is some time to fill some holes and fully leverage the new feature for0.9.0
.If folks agree, I would open some issues and start work on some follow-ups..
Follow-ups
ConflictChecker
support conflict resolution for streaming transactionsCommitChecker
DataChecker
.Related Issue(s)
part of #593
Documentation