Skip to content

Commit

Permalink
consider visibility in hash sender
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed May 16, 2022
1 parent 85006a6 commit eea253c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 5 deletions.
11 changes: 9 additions & 2 deletions src/batch/src/task/hash_shuffle_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
// limitations under the License.

use std::future::Future;
use std::ops::BitAnd;
use std::option::Option;

use risingwave_common::array::DataChunk;
use risingwave_common::buffer::Bitmap;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, ToRwResult};
use risingwave_common::util::hash_util::CRC32FastBuilder;
Expand Down Expand Up @@ -74,8 +76,13 @@ fn generate_new_data_chunks(
});
let mut res = Vec::with_capacity(output_count);
for (sink_id, vis_map_vec) in vis_maps.into_iter().enumerate() {
let vis_map = (vis_map_vec).try_into()?;
let new_data_chunk = chunk.with_visibility(vis_map).compact()?;
let vis_map: Bitmap = vis_map_vec.try_into()?;
let vis_map = if let Some(visibility) = chunk.get_visibility_ref() {
vis_map.bitand(visibility)?
} else {
vis_map
};
let new_data_chunk = chunk.with_visibility(vis_map);
trace!(
"send to sink:{}, cardinality:{}",
sink_id,
Expand Down
13 changes: 10 additions & 3 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,21 @@ impl<'a, 'b> BitAnd<&'b Bitmap> for &'a Bitmap {
type Output = Result<Bitmap>;

fn bitand(self, rhs: &'b Bitmap) -> Result<Bitmap> {
Ok(Bitmap::from((&self.bits & &rhs.bits)?))
assert_eq!(self.num_bits, rhs.num_bits);
let mut bitmap = Bitmap::from((&self.bits & &rhs.bits)?);
bitmap.num_bits = self.num_bits;
Ok(bitmap)
}
}

impl<'a, 'b> BitOr<&'b Bitmap> for &'a Bitmap {
type Output = Result<Bitmap>;

fn bitor(self, rhs: &'b Bitmap) -> Result<Bitmap> {
Ok(Bitmap::from((&self.bits | &rhs.bits)?))
assert_eq!(self.num_bits, rhs.num_bits);
let mut bitmap = Bitmap::from((&self.bits | &rhs.bits)?);
bitmap.num_bits = self.num_bits;
Ok(bitmap)
}
}

Expand Down Expand Up @@ -302,7 +308,8 @@ impl TryFrom<&ProstBuffer> for Bitmap {

fn try_from(buf: &ProstBuffer) -> Result<Bitmap> {
let mut builder = BitmapBuilder::default();
buf.get_body().as_slice().iter().for_each(|e| {
let body = buf.get_body().as_slice();
body.iter().for_each(|e| {
builder.append(*e == 1_u8);
});

Expand Down

0 comments on commit eea253c

Please sign in to comment.