Skip to content

Commit

Permalink
fix: correct partition-id to field-id in UnboundPartitionField (apach…
Browse files Browse the repository at this point in the history
…e#576)

* correct partition-id to field id in PartitionSpec

* correct partition-id to field id in PartitionSpec

* correct partition-id to field id in PartitionSpec

* xx
  • Loading branch information
FANNG1 authored and shaeqahmed committed Dec 9, 2024
1 parent 97701dd commit 1a414b5
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 49 deletions.
2 changes: 1 addition & 1 deletion crates/iceberg/src/expr/visitors/expression_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,7 +1661,7 @@ mod test {
.add_unbound_fields(vec![UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build()])
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/expr/visitors/inclusive_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(1)
.name("a".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Identity)
.build(),
)
Expand Down Expand Up @@ -386,7 +386,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(3)
.name("name_truncate".to_string())
.partition_id(3)
.field_id(3)
.transform(Transform::Truncate(4))
.build(),
)
Expand Down Expand Up @@ -426,7 +426,7 @@ mod tests {
UnboundPartitionField::builder()
.source_id(1)
.name("a_bucket[7]".to_string())
.partition_id(1)
.field_id(1)
.transform(Transform::Bucket(7))
.build(),
)
Expand Down
78 changes: 37 additions & 41 deletions crates/iceberg/src/spec/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct UnboundPartitionField {
/// A partition field id that is used to identify a partition field and is unique within a partition spec.
/// In v2 table metadata, it is unique across all partition specs.
#[builder(default, setter(strip_option))]
pub partition_id: Option<i32>,
pub field_id: Option<i32>,
/// A partition name.
pub name: String,
/// A transform that is applied to the source column to produce a partition value.
Expand Down Expand Up @@ -177,7 +177,7 @@ impl From<PartitionField> for UnboundPartitionField {
fn from(field: PartitionField) -> Self {
UnboundPartitionField {
source_id: field.source_id,
partition_id: Some(field.field_id),
field_id: Some(field.field_id),
name: field.name,
transform: field.transform,
}
Expand Down Expand Up @@ -224,7 +224,7 @@ impl UnboundPartitionSpecBuilder {
) -> Result<Self> {
let field = UnboundPartitionField {
source_id,
partition_id: None,
field_id: None,
name: target_name.to_string(),
transform: transformation,
};
Expand All @@ -246,8 +246,8 @@ impl UnboundPartitionSpecBuilder {
fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
if let Some(partition_id) = field.partition_id {
self.check_partition_id_unique(partition_id)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}
self.fields.push(field);
Ok(self)
Expand Down Expand Up @@ -331,7 +331,7 @@ impl<'a> PartitionSpecBuilder<'a> {
.id;
let field = UnboundPartitionField {
source_id,
partition_id: None,
field_id: None,
name: target_name.into(),
transform,
};
Expand All @@ -341,15 +341,15 @@ impl<'a> PartitionSpecBuilder<'a> {

/// Add a new partition field to the partition spec.
///
/// If `partition_id` is set, it is used as the field id.
/// If partition field id is set, it is used as the field id.
/// Otherwise, a new `field_id` is assigned.
pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> {
self.check_name_set_and_unique(&field.name)?;
self.check_for_redundant_partitions(field.source_id, &field.transform)?;
Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
Self::check_transform_compatibility(&field, self.schema)?;
if let Some(partition_id) = field.partition_id {
self.check_partition_id_unique(partition_id)?;
if let Some(partition_field_id) = field.field_id {
self.check_partition_id_unique(partition_field_id)?;
}

// Non-fallible from here
Expand Down Expand Up @@ -387,7 +387,7 @@ impl<'a> PartitionSpecBuilder<'a> {
// we skip it.
let assigned_ids = fields
.iter()
.filter_map(|f| f.partition_id)
.filter_map(|f| f.field_id)
.collect::<std::collections::HashSet<_>>();

fn _check_add_1(prev: i32) -> Result<i32> {
Expand All @@ -401,9 +401,9 @@ impl<'a> PartitionSpecBuilder<'a> {

let mut bound_fields = Vec::with_capacity(fields.len());
for field in fields.into_iter() {
let partition_id = if let Some(partition_id) = field.partition_id {
last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_id);
partition_id
let partition_field_id = if let Some(partition_field_id) = field.field_id {
last_assigned_field_id = std::cmp::max(last_assigned_field_id, partition_field_id);
partition_field_id
} else {
last_assigned_field_id = _check_add_1(last_assigned_field_id)?;
while assigned_ids.contains(&last_assigned_field_id) {
Expand All @@ -414,7 +414,7 @@ impl<'a> PartitionSpecBuilder<'a> {

bound_fields.push(PartitionField {
source_id: field.source_id,
field_id: partition_id,
field_id: partition_field_id,
name: field.name,
transform: field.transform,
})
Expand Down Expand Up @@ -544,11 +544,7 @@ trait CorePartitionSpecValidator {

/// Check field / partition_id unique within the partition spec if set
fn check_partition_id_unique(&self, field_id: i32) -> Result<()> {
if self
.fields()
.iter()
.any(|f| f.partition_id == Some(field_id))
{
if self.fields().iter().any(|f| f.field_id == Some(field_id)) {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
Expand Down Expand Up @@ -698,17 +694,17 @@ mod tests {
"spec-id": 1,
"fields": [ {
"source-id": 4,
"partition-id": 1000,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}, {
"source-id": 1,
"partition-id": 1001,
"field-id": 1001,
"name": "id_bucket",
"transform": "bucket[16]"
}, {
"source-id": 2,
"partition-id": 1002,
"field-id": 1002,
"name": "id_truncate",
"transform": "truncate[4]"
} ]
Expand All @@ -719,17 +715,17 @@ mod tests {
assert_eq!(Some(1), partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(Some(1000), partition_spec.fields[0].partition_id);
assert_eq!(Some(1000), partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);

assert_eq!(1, partition_spec.fields[1].source_id);
assert_eq!(Some(1001), partition_spec.fields[1].partition_id);
assert_eq!(Some(1001), partition_spec.fields[1].field_id);
assert_eq!("id_bucket", partition_spec.fields[1].name);
assert_eq!(Transform::Bucket(16), partition_spec.fields[1].transform);

assert_eq!(2, partition_spec.fields[2].source_id);
assert_eq!(Some(1002), partition_spec.fields[2].partition_id);
assert_eq!(Some(1002), partition_spec.fields[2].field_id);
assert_eq!("id_truncate", partition_spec.fields[2].name);
assert_eq!(Transform::Truncate(4), partition_spec.fields[2].transform);

Expand All @@ -746,7 +742,7 @@ mod tests {
assert_eq!(None, partition_spec.spec_id);

assert_eq!(4, partition_spec.fields[0].source_id);
assert_eq!(None, partition_spec.fields[0].partition_id);
assert_eq!(None, partition_spec.fields[0].field_id);
assert_eq!("ts_day", partition_spec.fields[0].name);
assert_eq!(Transform::Day, partition_spec.fields[0].transform);
}
Expand Down Expand Up @@ -963,14 +959,14 @@ mod tests {
PartitionSpec::builder(&schema)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
name: "id".to_string(),
transform: Transform::Identity,
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
partition_id: Some(1000),
field_id: Some(1000),
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
})
Expand Down Expand Up @@ -1004,22 +1000,22 @@ mod tests {
source_id: 1,
name: "id".to_string(),
transform: Transform::Identity,
partition_id: Some(1012),
field_id: Some(1012),
})
.unwrap()
.add_unbound_field(UnboundPartitionField {
source_id: 2,
name: "name_void".to_string(),
transform: Transform::Void,
partition_id: None,
field_id: None,
})
.unwrap()
// Should keep its ID even if its lower
.add_unbound_field(UnboundPartitionField {
source_id: 3,
name: "year".to_string(),
transform: Transform::Year,
partition_id: Some(1),
field_id: Some(1),
})
.unwrap()
.build()
Expand Down Expand Up @@ -1090,7 +1086,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Bucket(16),
})
Expand Down Expand Up @@ -1123,7 +1119,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
Expand All @@ -1136,7 +1132,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 2,
partition_id: None,
field_id: None,
name: "id".to_string(),
transform: Transform::Identity,
})
Expand Down Expand Up @@ -1171,13 +1167,13 @@ mod tests {
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 2,
partition_id: None,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
Expand All @@ -1192,13 +1188,13 @@ mod tests {
.add_unbound_fields(vec![
UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket".to_string(),
transform: Transform::Bucket(16),
},
UnboundPartitionField {
source_id: 4,
partition_id: None,
field_id: None,
name: "name".to_string(),
transform: Transform::Identity,
},
Expand Down Expand Up @@ -1237,7 +1233,7 @@ mod tests {
.with_spec_id(1)
.add_unbound_field(UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_year".to_string(),
transform: Transform::Year,
})
Expand All @@ -1250,7 +1246,7 @@ mod tests {
.with_spec_id(1)
.add_partition_fields(vec![UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}])
Expand All @@ -1261,7 +1257,7 @@ mod tests {
spec_id: Some(1),
fields: vec![UnboundPartitionField {
source_id: 1,
partition_id: None,
field_id: None,
name: "id_bucket[16]".to_string(),
transform: Transform::Bucket(16),
}]
Expand Down
6 changes: 3 additions & 3 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1293,7 +1293,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
})
.unwrap()
.build()
Expand Down Expand Up @@ -1416,7 +1416,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
})
.unwrap()
.build()
Expand Down Expand Up @@ -1496,7 +1496,7 @@ mod tests {
name: "x".to_string(),
transform: Transform::Identity,
source_id: 1,
partition_id: Some(1000),
field_id: Some(1000),
})
.unwrap()
.build()
Expand Down

0 comments on commit 1a414b5

Please sign in to comment.