Skip to content

Commit

Permalink
feat(stream): migrate ProjectExecutor to ExecutorV2 (#1810)
Browse files Browse the repository at this point in the history
* partly finish project migrating

* stream: migrate ProjectExecutor to ExecutorV2

* fix clippy
  • Loading branch information
WindowsXp-Beta authored Apr 13, 2022
1 parent 54f6fe8 commit 50f51fd
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 250 deletions.
8 changes: 5 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use super::*;
use crate::executor::test_utils::create_in_memory_keyspace;
use crate::executor_v2::receiver::ReceiverExecutor;
use crate::executor_v2::{
Executor as ExecutorV2, LocalSimpleAggExecutor, MergeExecutor, SimpleAggExecutor,
Executor as ExecutorV2, LocalSimpleAggExecutor, MergeExecutor, ProjectExecutor,
SimpleAggExecutor,
};
use crate::task::SharedContext;

Expand Down Expand Up @@ -164,7 +165,7 @@ async fn test_merger_sum_aggr() {
)
.v1();

let projection = ProjectExecutor::new(
let projection = Box::new(ProjectExecutor::new_from_v1(
Box::new(aggregator),
vec![],
vec![
Expand All @@ -173,7 +174,8 @@ async fn test_merger_sum_aggr() {
],
3,
"ProjectExecutor".to_string(),
);
))
.v1();
let items = Arc::new(Mutex::new(vec![]));
let consumer = MockConsumer::new(Box::new(projection), items.clone());
let context = SharedContext::for_test().into();
Expand Down
18 changes: 0 additions & 18 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,6 @@ pub fn create_executor(
Ok(real_executor)
}

/// `SimpleExecutor` accepts a single chunk as input.
pub trait SimpleExecutor: Executor {
fn consume_chunk(&mut self, chunk: StreamChunk) -> Result<Message>;
fn input(&mut self) -> &mut dyn Executor;
}

/// Most executors don't care about the control messages, and therefore
/// this method provides a default implementation helper for them.
async fn simple_executor_next<E: SimpleExecutor>(executor: &mut E) -> Result<Message> {
match executor.input().next().await {
Ok(message) => match message {
Message::Chunk(chunk) => executor.consume_chunk(chunk),
Message::Barrier(_) => Ok(message),
},
Err(e) => Err(e),
}
}

/// `StreamConsumer` is the last step in an actor
#[async_trait]
pub trait StreamConsumer: Send + Debug + 'static {
Expand Down
234 changes: 15 additions & 219 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::try_match_expand;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::stream_plan;
use risingwave_pb::stream_plan::stream_node::Node;
use risingwave_storage::StateStore;

use super::{Executor, Message, PkIndicesRef, SimpleExecutor, StreamChunk};
use crate::executor::{ExecutorBuilder, PkIndices};
use super::Executor;
use crate::executor::ExecutorBuilder;
use crate::executor_v2::{Executor as ExecutorV2, ProjectExecutor as ProjectExecutorV2};
use crate::task::{ExecutorParams, LocalStreamManagerCore};

/// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data,
/// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete
/// or update element into next operator according to the result of the expression.
pub struct ProjectExecutor {
schema: Schema,
pk_indices: PkIndices,

/// The input of the current operator
input: Box<dyn Executor>,
/// Expressions of the current projection.
exprs: Vec<BoxedExpression>,

/// Identity string
identity: String,

/// Logical Operator Info
op_info: String,
}

impl std::fmt::Debug for ProjectExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProjectExecutor")
.field("schema", &self.schema)
.field("pk_indices", &self.pk_indices)
.field("input", &self.input)
.field("exprs", &self.exprs)
.finish()
}
}

pub struct ProjectExecutorBuilder {}
pub struct ProjectExecutorBuilder;

impl ExecutorBuilder for ProjectExecutorBuilder {
fn new_boxed_executor(
Expand All @@ -73,185 +39,15 @@ impl ExecutorBuilder for ProjectExecutorBuilder {
.iter()
.map(build_from_prost)
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(ProjectExecutor::new(
params.input.remove(0),
params.pk_indices,
project_exprs,
params.executor_id,
params.op_info,
)))
}
}

impl ProjectExecutor {
pub fn new(
input: Box<dyn Executor>,
pk_indices: PkIndices,
exprs: Vec<BoxedExpression>,
executor_id: u64,
op_info: String,
) -> Self {
let schema = Schema {
fields: exprs
.iter()
.map(|e| Field::unnamed(e.return_type()))
.collect_vec(),
};
Self {
schema,
pk_indices,
input,
exprs,
identity: format!("ProjectExecutor {:X}", executor_id),
op_info,
}
}
}

#[async_trait]
impl Executor for ProjectExecutor {
async fn next(&mut self) -> Result<Message> {
super::simple_executor_next(self).await
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef {
&self.pk_indices
}

fn identity(&self) -> &str {
self.identity.as_str()
}

fn logical_operator_info(&self) -> &str {
&self.op_info
}
}

impl SimpleExecutor for ProjectExecutor {
fn input(&mut self) -> &mut dyn Executor {
&mut *self.input
}

fn consume_chunk(&mut self, chunk: StreamChunk) -> Result<Message> {
let chunk = chunk.compact()?;

let (ops, columns, visibility) = chunk.into_inner();

let data_chunk = {
let data_chunk_builder = DataChunk::builder().columns(columns);
if let Some(visibility) = visibility {
data_chunk_builder.visibility(visibility).build()
} else {
data_chunk_builder.build()
}
};

let projected_columns = self
.exprs
.iter_mut()
.map(|expr| expr.eval(&data_chunk).map(Column::new))
.collect::<Result<Vec<Column>>>()?;

drop(data_chunk);

let new_chunk = StreamChunk::new(ops, projected_columns, None);
Ok(Message::Chunk(new_chunk))
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::array::{I64Array, *};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_expr::expr::expr_binary_nonnull::new_binary_expr;
use risingwave_expr::expr::InputRefExpression;
use risingwave_pb::expr::expr_node::Type;

use crate::executor::test_utils::MockSource;
use crate::executor::{Executor, Message, PkIndices, ProjectExecutor};

#[tokio::test]
async fn test_projection() {
let chunk1 = StreamChunk::new(
vec![Op::Insert, Op::Insert, Op::Insert],
vec![
column_nonnull! { I64Array, [1, 2, 3] },
column_nonnull! { I64Array, [4, 5, 6] },
],
None,
);
let chunk2 = StreamChunk::new(
vec![Op::Insert, Op::Delete],
vec![
column_nonnull! { I64Array, [7, 3] },
column_nonnull! { I64Array, [8, 6] },
],
Some((vec![true, true]).try_into().unwrap()),
);
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
],
};
let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]);

let left_expr = InputRefExpression::new(DataType::Int64, 0);
let right_expr = InputRefExpression::new(DataType::Int64, 1);
let test_expr = new_binary_expr(
Type::Add,
DataType::Int64,
Box::new(left_expr),
Box::new(right_expr),
);

let mut project = ProjectExecutor::new(
Box::new(source),
vec![],
vec![test_expr],
1,
"ProjectExecutor".to_string(),
);

if let Message::Chunk(chunk) = project.next().await.unwrap() {
assert_eq!(chunk.ops(), vec![Op::Insert, Op::Insert, Op::Insert]);
assert_eq!(chunk.columns().len(), 1);
assert_eq!(
chunk
.column_at(0)
.array_ref()
.as_int64()
.iter()
.collect_vec(),
vec![Some(5), Some(7), Some(9)]
);
} else {
unreachable!();
}

if let Message::Chunk(chunk) = project.next().await.unwrap() {
assert_eq!(chunk.ops(), vec![Op::Insert, Op::Delete]);
assert_eq!(chunk.columns().len(), 1);
assert_eq!(
chunk
.column_at(0)
.array_ref()
.as_int64()
.iter()
.collect_vec(),
vec![Some(15), Some(9)]
);
} else {
unreachable!();
}

assert!(project.next().await.unwrap().is_stop());
Ok(Box::new(
Box::new(ProjectExecutorV2::new_from_v1(
params.input.remove(0),
params.pk_indices,
project_exprs,
params.executor_id,
params.op_info,
))
.v1(),
))
}
}
9 changes: 0 additions & 9 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ impl MockSource {
}
}

pub fn with_chunks(schema: Schema, pk_indices: PkIndices, chunks: Vec<StreamChunk>) -> Self {
Self {
schema,
pk_indices,
epoch: 0,
msgs: chunks.into_iter().map(Message::Chunk).collect(),
}
}

pub fn push_chunks(&mut self, chunks: impl Iterator<Item = StreamChunk>) {
self.msgs.extend(chunks.map(Message::Chunk));
}
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod local_simple_agg;
mod lookup;
pub mod merge;
pub(crate) mod mview;
mod project;
#[allow(dead_code)]
mod rearranged_chain;
pub mod receiver;
Expand All @@ -57,6 +58,7 @@ pub use local_simple_agg::LocalSimpleAggExecutor;
pub use lookup::*;
pub use merge::MergeExecutor;
pub use mview::*;
pub use project::ProjectExecutor;
pub(crate) use simple::{SimpleExecutor, SimpleExecutorWrapper};
pub use top_n::TopNExecutor;
pub use top_n_appendonly::AppendOnlyTopNExecutor;
Expand Down
Loading

0 comments on commit 50f51fd

Please sign in to comment.