diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs index 7f40e63ee9e30..f06cdd8e9824d 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/new_transform_partition_bucket.rs @@ -19,6 +19,7 @@ use std::marker::PhantomData; use std::sync::Arc; use bumpalo::Bump; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; @@ -196,6 +197,7 @@ impl Ok(self.initialized_all_inputs) } + #[allow(unused_assignments)] fn add_bucket(&mut self, mut data_block: DataBlock) -> Result<(isize, usize)> { let (mut bucket, mut partition_count) = (0, 0); if let Some(block_meta) = data_block.get_meta() { @@ -277,7 +279,16 @@ impl (payload.bucket, payload.max_partition_count) } }; + } else { + return Err(ErrorCode::Internal(format!( + "Internal, TransformPartitionBucket only recv AggregateMeta, but got {:?}", + block_meta + ))); } + } else { + return Err(ErrorCode::Internal( + "Internal, TransformPartitionBucket only recv DataBlock with meta.", + )); } if self.all_inputs_init {