Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream): migrate ProjectExecutor to ExecutorV2 #1810

Merged
merged 4 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/stream/src/executor/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ use super::*;
use crate::executor::test_utils::create_in_memory_keyspace;
use crate::executor_v2::receiver::ReceiverExecutor;
use crate::executor_v2::{
Executor as ExecutorV2, LocalSimpleAggExecutor, MergeExecutor, SimpleAggExecutor,
Executor as ExecutorV2, LocalSimpleAggExecutor, MergeExecutor, ProjectExecutor,
SimpleAggExecutor,
};
use crate::task::SharedContext;

Expand Down Expand Up @@ -164,7 +165,7 @@ async fn test_merger_sum_aggr() {
)
.v1();

let projection = ProjectExecutor::new(
let projection = Box::new(ProjectExecutor::new_from_v1(
Box::new(aggregator),
vec![],
vec![
Expand All @@ -173,7 +174,8 @@ async fn test_merger_sum_aggr() {
],
3,
"ProjectExecutor".to_string(),
);
))
.v1();
let items = Arc::new(Mutex::new(vec![]));
let consumer = MockConsumer::new(Box::new(projection), items.clone());
let context = SharedContext::for_test().into();
Expand Down
18 changes: 0 additions & 18 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,24 +510,6 @@ pub fn create_executor(
Ok(real_executor)
}

/// `SimpleExecutor` accepts a single chunk as input.
pub trait SimpleExecutor: Executor {
fn consume_chunk(&mut self, chunk: StreamChunk) -> Result<Message>;
fn input(&mut self) -> &mut dyn Executor;
}

/// Most executors don't care about the control messages, and therefore
/// this method provides a default implementation helper for them.
async fn simple_executor_next<E: SimpleExecutor>(executor: &mut E) -> Result<Message> {
match executor.input().next().await {
Ok(message) => match message {
Message::Chunk(chunk) => executor.consume_chunk(chunk),
Message::Barrier(_) => Ok(message),
},
Err(e) => Err(e),
}
}

/// `StreamConsumer` is the last step in an actor
#[async_trait]
pub trait StreamConsumer: Send + Debug + 'static {
Expand Down
234 changes: 15 additions & 219 deletions src/stream/src/executor/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use async_trait::async_trait;
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::Result;
use risingwave_common::try_match_expand;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_expr::expr::build_from_prost;
use risingwave_pb::stream_plan;
use risingwave_pb::stream_plan::stream_node::Node;
use risingwave_storage::StateStore;

use super::{Executor, Message, PkIndicesRef, SimpleExecutor, StreamChunk};
use crate::executor::{ExecutorBuilder, PkIndices};
use super::Executor;
use crate::executor::ExecutorBuilder;
use crate::executor_v2::{Executor as ExecutorV2, ProjectExecutor as ProjectExecutorV2};
use crate::task::{ExecutorParams, LocalStreamManagerCore};

/// `ProjectExecutor` project data with the `expr`. The `expr` takes a chunk of data,
/// and returns a new data chunk. And then, `ProjectExecutor` will insert, delete
/// or update element into next operator according to the result of the expression.
pub struct ProjectExecutor {
schema: Schema,
pk_indices: PkIndices,

/// The input of the current operator
input: Box<dyn Executor>,
/// Expressions of the current projection.
exprs: Vec<BoxedExpression>,

/// Identity string
identity: String,

/// Logical Operator Info
op_info: String,
}

impl std::fmt::Debug for ProjectExecutor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProjectExecutor")
.field("schema", &self.schema)
.field("pk_indices", &self.pk_indices)
.field("input", &self.input)
.field("exprs", &self.exprs)
.finish()
}
}

pub struct ProjectExecutorBuilder {}
pub struct ProjectExecutorBuilder;

impl ExecutorBuilder for ProjectExecutorBuilder {
fn new_boxed_executor(
Expand All @@ -73,185 +39,15 @@ impl ExecutorBuilder for ProjectExecutorBuilder {
.iter()
.map(build_from_prost)
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(ProjectExecutor::new(
params.input.remove(0),
params.pk_indices,
project_exprs,
params.executor_id,
params.op_info,
)))
}
}

impl ProjectExecutor {
pub fn new(
input: Box<dyn Executor>,
pk_indices: PkIndices,
exprs: Vec<BoxedExpression>,
executor_id: u64,
op_info: String,
) -> Self {
let schema = Schema {
fields: exprs
.iter()
.map(|e| Field::unnamed(e.return_type()))
.collect_vec(),
};
Self {
schema,
pk_indices,
input,
exprs,
identity: format!("ProjectExecutor {:X}", executor_id),
op_info,
}
}
}

#[async_trait]
impl Executor for ProjectExecutor {
async fn next(&mut self) -> Result<Message> {
super::simple_executor_next(self).await
}

fn schema(&self) -> &Schema {
&self.schema
}

fn pk_indices(&self) -> PkIndicesRef {
&self.pk_indices
}

fn identity(&self) -> &str {
self.identity.as_str()
}

fn logical_operator_info(&self) -> &str {
&self.op_info
}
}

impl SimpleExecutor for ProjectExecutor {
fn input(&mut self) -> &mut dyn Executor {
&mut *self.input
}

fn consume_chunk(&mut self, chunk: StreamChunk) -> Result<Message> {
let chunk = chunk.compact()?;

let (ops, columns, visibility) = chunk.into_inner();

let data_chunk = {
let data_chunk_builder = DataChunk::builder().columns(columns);
if let Some(visibility) = visibility {
data_chunk_builder.visibility(visibility).build()
} else {
data_chunk_builder.build()
}
};

let projected_columns = self
.exprs
.iter_mut()
.map(|expr| expr.eval(&data_chunk).map(Column::new))
.collect::<Result<Vec<Column>>>()?;

drop(data_chunk);

let new_chunk = StreamChunk::new(ops, projected_columns, None);
Ok(Message::Chunk(new_chunk))
}
}

#[cfg(test)]
mod tests {
use itertools::Itertools;
use risingwave_common::array::{I64Array, *};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::column_nonnull;
use risingwave_common::types::DataType;
use risingwave_expr::expr::expr_binary_nonnull::new_binary_expr;
use risingwave_expr::expr::InputRefExpression;
use risingwave_pb::expr::expr_node::Type;

use crate::executor::test_utils::MockSource;
use crate::executor::{Executor, Message, PkIndices, ProjectExecutor};

#[tokio::test]
async fn test_projection() {
let chunk1 = StreamChunk::new(
vec![Op::Insert, Op::Insert, Op::Insert],
vec![
column_nonnull! { I64Array, [1, 2, 3] },
column_nonnull! { I64Array, [4, 5, 6] },
],
None,
);
let chunk2 = StreamChunk::new(
vec![Op::Insert, Op::Delete],
vec![
column_nonnull! { I64Array, [7, 3] },
column_nonnull! { I64Array, [8, 6] },
],
Some((vec![true, true]).try_into().unwrap()),
);
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int64),
Field::unnamed(DataType::Int64),
],
};
let source = MockSource::with_chunks(schema, PkIndices::new(), vec![chunk1, chunk2]);

let left_expr = InputRefExpression::new(DataType::Int64, 0);
let right_expr = InputRefExpression::new(DataType::Int64, 1);
let test_expr = new_binary_expr(
Type::Add,
DataType::Int64,
Box::new(left_expr),
Box::new(right_expr),
);

let mut project = ProjectExecutor::new(
Box::new(source),
vec![],
vec![test_expr],
1,
"ProjectExecutor".to_string(),
);

if let Message::Chunk(chunk) = project.next().await.unwrap() {
assert_eq!(chunk.ops(), vec![Op::Insert, Op::Insert, Op::Insert]);
assert_eq!(chunk.columns().len(), 1);
assert_eq!(
chunk
.column_at(0)
.array_ref()
.as_int64()
.iter()
.collect_vec(),
vec![Some(5), Some(7), Some(9)]
);
} else {
unreachable!();
}

if let Message::Chunk(chunk) = project.next().await.unwrap() {
assert_eq!(chunk.ops(), vec![Op::Insert, Op::Delete]);
assert_eq!(chunk.columns().len(), 1);
assert_eq!(
chunk
.column_at(0)
.array_ref()
.as_int64()
.iter()
.collect_vec(),
vec![Some(15), Some(9)]
);
} else {
unreachable!();
}

assert!(project.next().await.unwrap().is_stop());
Ok(Box::new(
Box::new(ProjectExecutorV2::new_from_v1(
params.input.remove(0),
params.pk_indices,
project_exprs,
params.executor_id,
params.op_info,
))
.v1(),
))
}
}
9 changes: 0 additions & 9 deletions src/stream/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ impl MockSource {
}
}

pub fn with_chunks(schema: Schema, pk_indices: PkIndices, chunks: Vec<StreamChunk>) -> Self {
Self {
schema,
pk_indices,
epoch: 0,
msgs: chunks.into_iter().map(Message::Chunk).collect(),
}
}

pub fn push_chunks(&mut self, chunks: impl Iterator<Item = StreamChunk>) {
self.msgs.extend(chunks.map(Message::Chunk));
}
Expand Down
2 changes: 2 additions & 0 deletions src/stream/src/executor_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod local_simple_agg;
mod lookup;
pub mod merge;
pub(crate) mod mview;
mod project;
#[allow(dead_code)]
mod rearranged_chain;
pub mod receiver;
Expand All @@ -57,6 +58,7 @@ pub use local_simple_agg::LocalSimpleAggExecutor;
pub use lookup::*;
pub use merge::MergeExecutor;
pub use mview::*;
pub use project::ProjectExecutor;
pub(crate) use simple::{SimpleExecutor, SimpleExecutorWrapper};
pub use top_n::TopNExecutor;
pub use top_n_appendonly::AppendOnlyTopNExecutor;
Expand Down
Loading