Skip to content

Commit

Permalink
refactor(batch): return Err when polling finished executor (#1544)
Browse files Browse the repository at this point in the history
  • Loading branch information
Enter-tainer authored Apr 2, 2022
1 parent 41c8c30 commit 8573cfa
Show file tree
Hide file tree
Showing 24 changed files with 384 additions and 178 deletions.
29 changes: 16 additions & 13 deletions rust/batch/src/executor/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,19 +136,22 @@ impl BoxedExecutorBuilder for CreateSourceExecutor {
None
};

Ok(Box::new(Self {
table_id,
config,
format,
source_manager: source.global_batch_env().source_manager_ref(),
columns,
schema: Schema { fields: vec![] },
properties: properties.clone(),
schema_location: schema_location.clone(),
parser: None,
row_id_index,
identity: "CreateSourceExecutor".to_string(),
}))
Ok(Box::new(
Self {
table_id,
config,
format,
source_manager: source.global_batch_env().source_manager_ref(),
columns,
schema: Schema { fields: vec![] },
properties: properties.clone(),
schema_location: schema_location.clone(),
parser: None,
row_id_index,
identity: "CreateSourceExecutor".to_string(),
}
.fuse(),
))
}
}

Expand Down
17 changes: 10 additions & 7 deletions rust/batch/src/executor/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,16 @@ impl BoxedExecutorBuilder for CreateTableExecutor {

let table_id = TableId::from(&node.table_ref_id);

Ok(Box::new(Self {
table_id,
source_manager: source.global_batch_env().source_manager_ref(),
table_columns: node.column_descs.clone(),
identity: "CreateTableExecutor".to_string(),
info: node.info.clone().unwrap(),
}))
Ok(Box::new(
Self {
table_id,
source_manager: source.global_batch_env().source_manager_ref(),
table_columns: node.column_descs.clone(),
identity: "CreateTableExecutor".to_string(),
info: node.info.clone().unwrap(),
}
.fuse(),
))
}
}

Expand Down
13 changes: 8 additions & 5 deletions rust/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,14 @@ impl BoxedExecutorBuilder for DeleteExecutor {
})?;
let child = source.clone_for_plan(proto_child).build()?;

Ok(Box::new(Self::new(
table_id,
source.global_batch_env().source_manager_ref(),
child,
)))
Ok(Box::new(
Self::new(
table_id,
source.global_batch_env().source_manager_ref(),
child,
)
.fuse(),
))
}
}

Expand Down
15 changes: 9 additions & 6 deletions rust/batch/src/executor/drop_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ impl BoxedExecutorBuilder for DropStreamExecutor {

let table_id = TableId::from(&node.table_ref_id);

Ok(Box::new(Self {
table_id,
source_manager: source.global_batch_env().source_manager_ref(),
schema: Schema { fields: vec![] },
identity: "DropStreamExecutor".to_string(),
}))
Ok(Box::new(
Self {
table_id,
source_manager: source.global_batch_env().source_manager_ref(),
schema: Schema { fields: vec![] },
identity: "DropStreamExecutor".to_string(),
}
.fuse(),
))
}
}
13 changes: 8 additions & 5 deletions rust/batch/src/executor/drop_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ impl BoxedExecutorBuilder for DropTableExecutor {

let table_id = TableId::from(&node.table_ref_id);

Ok(Box::new(Self {
table_id,
schema: Schema { fields: vec![] },
identity: "DropTableExecutor".to_string(),
}))
Ok(Box::new(
Self {
table_id,
schema: Schema { fields: vec![] },
identity: "DropTableExecutor".to_string(),
}
.fuse(),
))
}
}

Expand Down
19 changes: 11 additions & 8 deletions rust/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,17 @@ impl BoxedExecutorBuilder for FilterExecutor {
let chunk_builder =
DataChunkBuilder::new(child.schema().data_types(), DEFAULT_CHUNK_BUFFER_SIZE);

return Ok(Box::new(Self {
expr,
child,
chunk_builder,
last_input: None,
identity: source.plan_node().get_identity().clone(),
child_can_be_nexted: true,
}));
return Ok(Box::new(
Self {
expr,
child,
chunk_builder,
last_input: None,
identity: source.plan_node().get_identity().clone(),
child_can_be_nexted: true,
}
.fuse(),
));
}
Err(InternalError("Filter must have one children".to_string()).into())
}
Expand Down
133 changes: 133 additions & 0 deletions rust/batch/src/executor/fuse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;

use crate::executor::Executor;

/// [`FusedExecutor`] is a wrapper around a Executor. After wrapping, once a call to
/// `next` returns `Ok(None)`, all subsequent calls to `next` will return an
/// error.
pub struct FusedExecutor<T: Executor> {
/// The underlying executor.
inner: T,
/// Whether the underlying executor should return `Err` or not.
invalid: bool,
}

impl<T: Executor> FusedExecutor<T> {
pub fn new(executor: T) -> FusedExecutor<T> {
FusedExecutor {
inner: executor,
invalid: false,
}
}
}

#[async_trait::async_trait]
impl<T: Executor> Executor for FusedExecutor<T> {
async fn open(&mut self) -> Result<()> {
self.inner.open().await
}

async fn next(&mut self) -> Result<Option<DataChunk>> {
if self.invalid {
// The executor is invalid now, so we simply return an error.
return Err(InternalError("Polling an already finished executor".to_string()).into());
}
match self.inner.next().await? {
res @ Some(_) => Ok(res),
None => {
// Once the underlying executor returns `Ok(None)`,
// subsequence calls will return `Err`.
self.invalid = true;
Ok(None)
}
}
}

async fn close(&mut self) -> Result<()> {
self.inner.close().await
}

fn schema(&self) -> &Schema {
self.inner.schema()
}

fn identity(&self) -> &str {
self.inner.identity()
}
}

#[cfg(test)]
mod tests {

use std::sync::Arc;

use assert_matches::assert_matches;
use risingwave_common::array::column::Column;
use risingwave_common::array::{Array, DataChunk, PrimitiveArray};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;

use super::*;
use crate::executor::test_utils::MockExecutor;

#[tokio::test]
async fn test_fused_executor() {
let col1 = create_column(&[Some(2), Some(2)]).unwrap();
let col2 = create_column(&[Some(1), Some(2)]).unwrap();
let data_chunk = DataChunk::builder().columns([col1, col2].to_vec()).build();
let schema = Schema {
fields: vec![
Field::unnamed(DataType::Int32),
Field::unnamed(DataType::Int32),
],
};
let mut mock_executor = MockExecutor::with_chunk(data_chunk, schema).fuse();
let fields = &mock_executor.schema().fields;
assert_eq!(fields[0].data_type, DataType::Int32);
assert_eq!(fields[1].data_type, DataType::Int32);
mock_executor.open().await.unwrap();
let res = mock_executor.next().await.unwrap();
assert_matches!(res, Some(_));
if let Some(res) = res {
let col1 = res.column_at(0);
let array = col1.array();
let col1 = array.as_int32();
assert_eq!(col1.len(), 2);
assert_eq!(col1.value_at(0), Some(2));
assert_eq!(col1.value_at(1), Some(2));
let col2 = res.column_at(1);
let array = col2.array();
let col2 = array.as_int32();
assert_eq!(col2.len(), 2);
assert_eq!(col2.value_at(0), Some(1));
assert_eq!(col2.value_at(1), Some(2));
}
let res = mock_executor.next().await.unwrap();
assert_matches!(res, None);
let res = mock_executor.next().await;
assert_matches!(res, Err(_));
mock_executor.close().await.unwrap();
}

fn create_column(vec: &[Option<i32>]) -> Result<Column> {
let array = PrimitiveArray::from_slice(vec).map(|x| Arc::new(x.into()))?;
Ok(Column::new(array))
}
}
19 changes: 11 additions & 8 deletions rust/batch/src/executor/generate_series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ impl BoxedExecutorBuilder for GenerateSeriesI32Executor {
NodeBody::GenerateInt32Series
)?;

Ok(Box::new(Self {
start: node.start,
stop: node.stop,
step: node.step,
cur: node.start,
schema: Schema::new(vec![Field::unnamed(DataType::Int32)]),
identity: source.plan_node().get_identity().clone(),
}))
Ok(Box::new(
Self {
start: node.start,
stop: node.stop,
step: node.step,
cur: node.start,
schema: Schema::new(vec![Field::unnamed(DataType::Int32)]),
identity: source.plan_node().get_identity().clone(),
}
.fuse(),
))
}
}

Expand Down
25 changes: 14 additions & 11 deletions rust/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,20 @@ impl<CS: 'static + CreateSource> BoxedExecutorBuilder for GenericExchangeExecuto
let sources: Vec<ProstExchangeSource> = node.get_sources().to_vec();
let input_schema: Vec<NodeField> = node.get_input_schema().to_vec();
let fields = input_schema.iter().map(Field::from).collect::<Vec<Field>>();
Ok(Box::new(Self {
sources,
server_addr,
env: source.env.clone(),
source_creator: PhantomData,
source_idx: 0,
current_source: None,
schema: Schema { fields },
task_id: source.task_id.clone(),
identity: source.plan_node().get_identity().clone(),
}))
Ok(Box::new(
Self {
sources,
server_addr,
env: source.env.clone(),
source_creator: PhantomData,
source_idx: 0,
current_source: None,
schema: Schema { fields },
task_id: source.task_id.clone(),
identity: source.plan_node().get_identity().clone(),
}
.fuse(),
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/batch/src/executor/hash_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl HashKeyDispatcher for HashAggExecutorBuilderDispatcher {
type Output = BoxedExecutor;

fn dispatch<K: HashKey>(input: HashAggExecutorBuilder) -> Self::Output {
Box::new(HashAggExecutor::<K>::new(input))
Box::new(HashAggExecutor::<K>::new(input).fuse())
}
}

Expand Down
17 changes: 10 additions & 7 deletions rust/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,16 @@ impl BoxedExecutorBuilder for InsertExecutor {
})?;
let child = source.clone_for_plan(proto_child).build()?;

Ok(Box::new(Self::new(
table_id,
source.global_batch_env().source_manager_ref(),
child,
source.global_batch_env().worker_id(),
insert_node.frontend_v2,
)))
Ok(Box::new(
Self::new(
table_id,
source.global_batch_env().source_manager_ref(),
child,
source.global_batch_env().worker_id(),
insert_node.frontend_v2,
)
.fuse(),
))
}
}

Expand Down
17 changes: 10 additions & 7 deletions rust/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,16 @@ impl HashKeyDispatcher for HashJoinExecutorBuilderDispatcher {
type Output = BoxedExecutor;

fn dispatch<K: HashKey>(input: HashJoinExecutorBuilder) -> Self::Output {
Box::new(HashJoinExecutor::<K>::new(
input.left_child,
input.right_child,
input.params,
input.schema,
format!("HashJoinExecutor{:?}", input.task_id),
))
Box::new(
HashJoinExecutor::<K>::new(
input.left_child,
input.right_child,
input.params,
input.schema,
format!("HashJoinExecutor{:?}", input.task_id),
)
.fuse(),
)
}
}

Expand Down
Loading

0 comments on commit 8573cfa

Please sign in to comment.