-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: check constraints #1915
feat: check constraints #1915
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
@hntd187 does the delta protocol allow to add constraints during create operation? |
I'm not sure, the documentation only seems to add them post creation. |
Hmm, perhaps we can have that then during create actions. It just adds the constraints as additional in the config, right? |
Yeah, do an initial create, then just do the alter to add the constraint should be easy since the table is empty. |
@hntd187 - could we reduce the number of files in the example table to only contain a single add action per commit (unless we need more for the case), to keep entropy down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this @hntd187 - left a few comments.
Would it make sense to also make sure we check this on the python side?
invariants: Vec<Invariant>, | ||
ctx: SessionContext, | ||
} | ||
|
||
impl DeltaDataChecker { | ||
/// Create a new DeltaDataChecker with a specified set of invariants | ||
pub fn with_invariants(invariants: Vec<Invariant>) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually reserve the with_*
style syntax for builder like objects (i.e. when mutate an existing object) and new_*
when having something constructor like.
Here new_with_invariants
might work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for both
let metadata = snapshot.metadata(); | ||
|
||
let invariants = metadata | ||
.and_then(|meta| meta.schema.get_invariants().ok()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just had a quick look, but i think get_invariants
only errors when there are invariants defined, but in an illegal format. This is something we should return as an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was copied from the place it originally was prior to invariant enforcement in the data checker. Perhaps since this is not related to check constraints and slightly alters what would happen normally can we have a followup PR on it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. but maybe add a TODO?
return Ok(()); | ||
} | ||
|
||
// if !self.ctx.table_exist("data")? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code?
@@ -170,7 +170,7 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| { | |||
let mut writer_features = HashSet::new(); | |||
writer_features.insert(WriterFeatures::AppendOnly); | |||
writer_features.insert(WriterFeatures::Invariants); | |||
// writer_features.insert(WriterFeatures::CheckConstraints); | |||
writer_features.insert(WriterFeatures::CheckConstraints); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we missed that for invariants, but should fix that now :).
The Invariants and check features should only be added if the datafusion
features is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hntd187 @scarman-db - one last thing, could we hide Invariants
and CheckConstraints
behind the datafusion
features, as they will not work otherwise ...
if this.name.is_none() { | ||
return Err(DeltaTableError::Generic("No name provided".to_string())); | ||
} else if this.expr.is_none() { | ||
return Err(DeltaTableError::Generic( | ||
"No expression provided".to_string(), | ||
)); | ||
} | ||
let name = this.name.unwrap(); | ||
let expr = match this.expr { | ||
Some(Expression::String(s)) => s, | ||
Some(Expression::DataFusion(e)) => e.to_string(), | ||
None => unreachable!(), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather then doing the prior checks and then use unwrap / unreachable, could we just return the error in the match statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
let files_to_check = | ||
find_files(&this.snapshot, this.log_store.clone(), &state, None).await?; | ||
let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) | ||
.with_files(&files_to_check.candidates) | ||
.build() | ||
.await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just create a scan without explicitly setting files, since it will just scan all files, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, still learning about the code base. :-) I removed it
let records = collect_sendable_stream(record_stream).await?; | ||
|
||
for batch in records { | ||
checker.check_batch(&batch).await?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to scan each batch as we materialize it, rather then requiring to keep the entire table in memory for a scan. Similar to what we use the checker for writing data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have an execution plan here from what I can tell I need that to split it all into tasks similar to what update does. Any suggestion on what to do there?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should look similar to what we do when we write the stream to disk - sans the writing stuff :)
delta-rs/crates/deltalake-core/src/operations/write.rs
Lines 324 to 351 in e54996a
let mut tasks = vec![]; | |
for i in 0..plan.output_partitioning().partition_count() { | |
let inner_plan = plan.clone(); | |
let inner_schema = schema.clone(); | |
let task_ctx = Arc::new(TaskContext::from(&state)); | |
let config = WriterConfig::new( | |
inner_schema.clone(), | |
partition_columns.clone(), | |
writer_properties.clone(), | |
target_file_size, | |
write_batch_size, | |
); | |
let mut writer = DeltaWriter::new(object_store.clone(), config); | |
let checker_stream = checker.clone(); | |
let mut stream = inner_plan.execute(i, task_ctx)?; | |
let handle: tokio::task::JoinHandle<DeltaResult<Vec<Add>>> = | |
tokio::task::spawn(async move { | |
while let Some(maybe_batch) = stream.next().await { | |
let batch = maybe_batch?; | |
checker_stream.check_batch(&batch).await?; | |
let arr = cast_record_batch(&batch, inner_schema.clone(), safe_cast)?; | |
writer.write(&arr).await?; | |
} | |
writer.close().await | |
}); | |
tasks.push(handle); | |
} |
To avoid having to iterate through all the partitions in a loop, You could also wrap it in a CoalesceExec
, where you would always end up with a single partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used exactly this to reflect the ask. :-)
let protocol = Protocol { | ||
min_reader_version: if old_protocol.min_reader_version > 1 { | ||
old_protocol.min_reader_version | ||
} else { | ||
1 | ||
}, | ||
min_writer_version: if old_protocol.min_reader_version > 3 { | ||
old_protocol.min_reader_version | ||
} else { | ||
3 | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some mix-ups here when to use reader and writer version?
Also maybe add a comment that this would be the least required versions for the check feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done and done
let commit_info = CommitInfo { | ||
timestamp: Some(Utc::now().timestamp_millis()), | ||
operation: Some("ADD CONSTRAINT".to_string()), | ||
operation_parameters: Some(operational_parameters), | ||
read_version: Some(this.snapshot.version()), | ||
isolation_level: Some(IsolationLevel::Serializable), | ||
is_blind_append: Some(false), | ||
..Default::default() | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We usually create the CommitInfo
via the assoaciated commit_info
method on the DeltaOperation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem better than what I'm doing here, I'd still have to set most of this myself. I took this info from the spark implementation when I created it. Is that okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah its fine .. the creation of the commit info itself is not so critical.
The only really valuable thing over there is that we have a single place where we can define the string representation for an operation, in this case ADD CONSTRAINT
. In erarlier version of delta-rs we hade some inconsistencies in that regard.
Come to think of it, most information - like read_version
, isolation_level
etc. we should probably set in a central place within the commit
function anyhow. Especially since there are some cases where we can (and do) relax the isolation level for the actual commit. Should we maybe add an issue for that?
) | ||
.await?; | ||
|
||
Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think, this snapshot will reflect he latest table state including the updates we just did, as the commit
function will not update the snapshot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the correct thing here @roeap is to do a merge on the states after the commit right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly, we should add the actions we just committed to the table also to the new state.
…o not need a physical table, addressed most (but not all) PR comments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor nit and a small ask, then one more sync with main and we are good to go!
Action::Protocol(protocol), | ||
]; | ||
|
||
let _version = commit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just a minor nit, but since we use it these is no more need for the underscore.
@@ -170,7 +170,7 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| { | |||
let mut writer_features = HashSet::new(); | |||
writer_features.insert(WriterFeatures::AppendOnly); | |||
writer_features.insert(WriterFeatures::Invariants); | |||
// writer_features.insert(WriterFeatures::CheckConstraints); | |||
writer_features.insert(WriterFeatures::CheckConstraints); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hntd187 @scarman-db - one last thing, could we hide Invariants
and CheckConstraints
behind the datafusion
features, as they will not work otherwise ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for sticking with it, great work @hntd187!
Description
This PR adds CHECK constraints on delta tables. I still have some outstanding work to do on this, but this is my working draft PR for this feature.
Related Issue(s)
#1881
Documentation