Skip to content

Commit

Permalink
fix(streaming): revert "feat(streaming): split chunks in source execu…
Browse files Browse the repository at this point in the history
…tor" (#2608)

Revert "feat(streaming): split chunks in source executor (#2361)"

This reverts commit 9fd7119.

Co-authored-by: Alex Chi <[email protected]>
  • Loading branch information
tabVersion and skyzh authored May 18, 2022
1 parent 0c160b7 commit b4ead12
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 213 deletions.
167 changes: 0 additions & 167 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,97 +348,6 @@ impl DataChunk {
Ok(new_chunks)
}

/// `rechunk_head` splits out a data chunk with `head_chunk_size` from the head of a vector of
/// chunks. When the total cardinality of all the chunks is smaller than `head_chunk_size`,
/// the remainder chunk will be returned. Otherwise, the head chunk and the remainder chunks is
/// returned in a vector.
///
/// Currently, `rechunk_head` would ignore visibility map. May or may not support it later
/// depending on the demand
pub fn rechunk_head(chunks: Vec<DataChunk>, head_chunk_size: usize) -> Result<Vec<DataChunk>> {
assert!(head_chunk_size > 0);
// Corner case: one of the `chunks` may have 0 length
// remove the chunks with zero physical length here,
// or skip them in the loop below
let chunks = chunks
.into_iter()
.filter(|chunk| chunk.capacity() != 0)
.collect::<Vec<_>>();
if chunks.is_empty() {
return Ok(Vec::new());
}
assert!(!chunks[0].columns.is_empty());

let total_capacity = chunks
.iter()
.map(|chunk| chunk.capacity())
.reduce(|x, y| x + y)
.unwrap();

// the idx of `chunks`
let mut chunk_idx = 0;
// how many rows does this new chunk need?
let mut new_chunk_require = std::cmp::min(total_capacity, head_chunk_size);
let mut array_builders: Vec<ArrayBuilderImpl> = chunks[0]
.columns
.iter()
.map(|col| col.array_ref().create_builder(new_chunk_require))
.try_collect()?;
let mut end_row_idx = 0;
let mut capacity = 0;
while chunk_idx < chunks.len() {
capacity = chunks[chunk_idx].capacity();
let actual_acquire = std::cmp::min(new_chunk_require, capacity);
end_row_idx = actual_acquire - 1;
array_builders
.iter_mut()
.zip_eq(chunks[chunk_idx].columns())
.try_for_each(|(builder, column)| {
let mut array_builder = column.array_ref().create_builder(end_row_idx + 1)?;
for row_idx in 0..=end_row_idx {
array_builder.append_datum_ref(column.array_ref().value_at(row_idx))?;
}
builder.append_array(&array_builder.finish()?)
})?;
chunk_idx += 1;

new_chunk_require -= actual_acquire;
// a new chunk receives enough rows, finalize it
if new_chunk_require == 0 {
break;
}
}

let new_columns: Vec<Column> = array_builders
.drain(..)
.map(|builder| {
let array = builder.finish()?;
Ok::<_, RwError>(Column::new(Arc::new(array)))
})
.try_collect()?;

// Build return chunks, containing the head chunk and the remainder chunks.
let data_chunk = DataChunk::builder().columns(new_columns).build();
let mut data_chunks = vec![data_chunk];
if end_row_idx + 1 < capacity {
let remainder_columns: Vec<Column> = chunks[chunk_idx - 1]
.columns
.iter()
.map(|col| {
let mut builder = col.array_ref().create_builder(capacity - end_row_idx - 1)?;
for row_idx in (end_row_idx + 1)..capacity {
builder.append_datum_ref(col.array_ref().value_at(row_idx))?;
}
let array = builder.finish()?;
Ok::<_, RwError>(Column::new(Arc::new(array)))
})
.try_collect()?;
data_chunks.push(DataChunk::builder().columns(remainder_columns).build());
}
data_chunks.extend(chunks[chunk_idx..].to_vec());
Ok(data_chunks)
}

pub fn get_hash_values<H: BuildHasher>(
&self,
column_idxes: &[usize],
Expand Down Expand Up @@ -728,82 +637,6 @@ mod tests {
test_case(10, 10, 7);
}

#[test]
fn test_rechunk_head() {
let test_case = |num_chunks: usize, chunk_size: usize, head_chunk_size: usize| {
let mut chunks = vec![];
for chunk_idx in 0..num_chunks {
let mut builder = PrimitiveArrayBuilder::<i32>::new(0).unwrap();
for i in chunk_size * chunk_idx..chunk_size * (chunk_idx + 1) {
builder.append(Some(i as i32)).unwrap();
}
let chunk = DataChunk::builder()
.columns(vec![Column::new(Arc::new(
builder.finish().unwrap().into(),
))])
.build();
chunks.push(chunk);
}

let total_size = num_chunks * chunk_size;
let (head_remainder_size, remainder_chunk_size, mut chunk_sizes) =
if chunk_size > 0 && total_size > 0 {
if head_chunk_size >= total_size {
(0, 0, vec![total_size])
} else {
let head_remainder_size =
(chunk_size - head_chunk_size % chunk_size) % chunk_size;
(
head_remainder_size,
(total_size - head_remainder_size - head_chunk_size) / chunk_size,
vec![head_chunk_size],
)
}
} else {
(0, 0, vec![])
};
if head_remainder_size > 0 {
chunk_sizes.push(head_remainder_size);
}
chunk_sizes.extend(vec![chunk_size; remainder_chunk_size]);

let new_chunks = DataChunk::rechunk_head(chunks, head_chunk_size).unwrap();
assert_eq!(new_chunks.len(), chunk_sizes.len());
// check cardinality
for (idx, chunk_size) in chunk_sizes.iter().enumerate() {
assert_eq!(*chunk_size, new_chunks[idx].capacity());
}

let mut chunk_idx = 0;
let mut cur_idx = 0;
for val in 0..total_size {
if cur_idx >= chunk_sizes[chunk_idx] {
cur_idx = 0;
chunk_idx += 1;
}
assert_eq!(
new_chunks[chunk_idx]
.column_at(0)
.array()
.as_int32()
.value_at(cur_idx)
.unwrap(),
val as i32
);
cur_idx += 1;
}
};

test_case(0, 0, 1);
test_case(0, 10, 1);
test_case(10, 0, 1);
test_case(1, 1, 6);
test_case(1, 10, 11);
test_case(2, 3, 6);
test_case(5, 5, 6);
test_case(10, 10, 7);
}

#[test]
fn test_chunk_iter() {
let num_of_columns: usize = 2;
Expand Down
52 changes: 6 additions & 46 deletions src/stream/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::stream::{select_with_strategy, PollNext};
use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::column::Column;
use risingwave_common::array::{ArrayBuilder, ArrayImpl, I64ArrayBuilder, Op, StreamChunk};
use risingwave_common::array::{ArrayBuilder, ArrayImpl, I64ArrayBuilder, StreamChunk};
use risingwave_common::catalog::{ColumnId, Schema, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
Expand Down Expand Up @@ -254,51 +254,11 @@ impl<S: StateStore> SourceExecutor<S> {
chunk = self.refill_row_id_column(chunk);
}

let (data, ops) = chunk.into_parts();
let mut chunks = DataChunk::rechunk(&[data], PROCESSING_WINDOW_SIZE)
.unwrap()
.into_iter();
let mut ops_batch = ops.chunks(PROCESSING_WINDOW_SIZE);
let mut next_chunk = None;
let mut next_ops: Option<Vec<Op>> = None;
loop {
let (mut chunk, mut ops) = {
if let Some(chunk) = std::mem::take(&mut next_chunk) {
let ops = std::mem::take(&mut next_ops).unwrap();
(chunk, ops.to_vec())
} else if let Some(chunk) = chunks.next() {
(chunk, ops_batch.next().unwrap().to_vec())
} else {
break;
}
};

// There is paired `UpdateDelete` and `UpdateInsert` split by border, we
// should re-chunk current stream chunk with next stream chunk
if ops[ops.len() - 1] == Op::UpdateDelete {
let pair_chunk = chunks.next().unwrap();
let pair_ops = ops_batch.next().unwrap();
assert!(pair_ops[0] == Op::UpdateInsert);

let head_chunk_size = chunk.capacity() + 1;
let mut new_chunks =
DataChunk::rechunk_head(vec![chunk, pair_chunk], head_chunk_size)
.map_err(StreamExecutorError::eval_error)?;
assert_eq!(new_chunks.len(), 2);

chunk = std::mem::take(&mut new_chunks[0]);
ops.push(pair_ops[0]);
next_chunk = Some(std::mem::take(&mut new_chunks[1]));
next_ops = Some(pair_ops[1..].to_vec());
}

let stream_chunk = StreamChunk::from_parts(ops, chunk);
self.metrics
.source_output_row_count
.with_label_values(&[self.source_identify.as_str()])
.inc_by(stream_chunk.cardinality() as u64);
yield Message::Chunk(stream_chunk);
}
self.metrics
.source_output_row_count
.with_label_values(&[self.source_identify.as_str()])
.inc_by(chunk.cardinality() as u64);
yield Message::Chunk(chunk);
}
}
}
Expand Down

0 comments on commit b4ead12

Please sign in to comment.