Skip to content

Commit

Permalink
refactor:introduce per crate error for batch executors (#3658)
Browse files Browse the repository at this point in the history
* introduce error.rs in batch/executor

* modify error type

* modify mod.rs to use BatchExecutorError

* modify error to compatible RwError

* modify delete executor

* modify filter

* modify generic_exchange

* modify hop_windows

* modify hash_agg

* modify insert

* modify

* modify

* modify error

* fix clippy

* rename BatchExecutorError to BatchError

* fix format

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
ZENOTME and mergify[bot] authored Jul 5, 2022
1 parent 085d256 commit 67dbe53
Show file tree
Hide file tree
Showing 15 changed files with 113 additions and 66 deletions.
46 changes: 46 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub use anyhow::anyhow;
use risingwave_common::array::ArrayError;
use risingwave_common::error::{ErrorCode, RwError};
use thiserror::Error;

pub type Result<T> = std::result::Result<T, BatchError>;

pub trait Error = std::error::Error + Send + Sync + 'static;

#[derive(Error, Debug)]
pub enum BatchError {
#[error("Unsupported function: {0}")]
UnsupportedFunction(String),

#[error("Can't cast {0} to {1}")]
Cast(&'static str, &'static str),

#[error("Array error: {0}")]
Array(#[from] ArrayError),

#[error("Out of range")]
NumericOutOfRange,

#[error(transparent)]
Internal(#[from] anyhow::Error),
}

impl From<BatchError> for RwError {
fn from(s: BatchError) -> Self {
ErrorCode::BatchError(Box::new(s)).into()
}
}
13 changes: 5 additions & 8 deletions src/batch/src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use futures::future::try_join_all;
use futures_async_stream::try_stream;
use risingwave_common::array::{ArrayBuilder, DataChunk, Op, PrimitiveArrayBuilder, StreamChunk};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
Expand Down Expand Up @@ -90,11 +91,7 @@ impl DeleteExecutor {
// Wait for all chunks to be taken / written.
let rows_deleted = try_join_all(notifiers)
.await
.map_err(|_| {
RwError::from(ErrorCode::InternalError(
"failed to wait chunks to be written".to_owned(),
))
})?
.map_err(|_| BatchError::Internal(anyhow!("failed to wait chunks to be written")))?
.into_iter()
.sum::<usize>();

Expand Down Expand Up @@ -130,7 +127,7 @@ impl BoxedExecutorBuilder for DeleteExecutor {
source
.context()
.source_manager_ref()
.ok_or_else(|| InternalError("Source manager not found".to_string()))?,
.ok_or_else(|| BatchError::Internal(anyhow!("Source manager not found")))?,
inputs.remove(0),
)))
}
Expand Down
7 changes: 5 additions & 2 deletions src/batch/src/executor/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
use futures_async_stream::try_stream;
use risingwave_common::array::ArrayImpl::Bool;
use risingwave_common::array::{Array, DataChunk};
use risingwave_common::catalog::Schema;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_expr::expr::{build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
Expand Down Expand Up @@ -66,7 +67,9 @@ impl FilterExecutor {
yield data_chunk?;
}
} else {
return Err(InternalError("Filter can only receive bool array".to_string()).into());
return Err(
BatchError::Internal(anyhow!("Filter can only receive bool array")).into(),
);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/batch/src/executor/generic_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::task::{BatchTaskContext, TaskId};

pub type ExchangeExecutor<C> = GenericExchangeExecutor<DefaultCreateSource, C>;
use crate::executor::{BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor};

pub struct GenericExchangeExecutor<CS, C> {
sources: Vec<ProstExchangeSource>,
context: C,
Expand Down
20 changes: 11 additions & 9 deletions src/batch/src/executor/hop_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,25 @@

use std::num::NonZeroUsize;

use anyhow::anyhow;
use futures_async_stream::try_stream;
use itertools::Itertools;
use num_traits::CheckedSub;
use risingwave_common::array::column::Column;
use risingwave_common::array::{DataChunk, Vis};
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, IntervalUnit, ScalarImpl};
use risingwave_expr::expr::expr_binary_nonnull::new_binary_expr;
use risingwave_expr::expr::{Expression, InputRefExpression, LiteralExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::expr::expr_node;

use crate::error::BatchError;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

pub struct HopWindowExecutor {
child: BoxedExecutor,
identity: String,
Expand Down Expand Up @@ -144,7 +145,7 @@ impl HopWindowExecutor {
.exact_div(&window_slide)
.and_then(|x| NonZeroUsize::new(usize::try_from(x).ok()?))
.ok_or_else(|| {
RwError::from(ErrorCode::InternalError(format!(
BatchError::Internal(anyhow!(format!(
"window_size {} cannot be divided by window_slide {}",
window_size, window_slide
)))
Expand All @@ -162,7 +163,7 @@ impl HopWindowExecutor {
// tumble_start(`time_col` - (`window_size` - `window_slide`), `window_slide`).
// Let's pre calculate (`window_size` - `window_slide`).
let window_size_sub_slide = window_size.checked_sub(&window_slide).ok_or_else(|| {
RwError::from(ErrorCode::InternalError(format!(
BatchError::Internal(anyhow!(format!(
"window_size {} cannot be subtracted by window_slide {}",
window_size, window_slide
)))
Expand All @@ -189,7 +190,7 @@ impl HopWindowExecutor {

for i in 0..units {
let window_start_offset = window_slide.checked_mul_int(i).ok_or_else(|| {
RwError::from(ErrorCode::InternalError(format!(
BatchError::Internal(anyhow!(format!(
"window_slide {} cannot be multiplied by {}",
window_slide, i
)))
Expand All @@ -200,7 +201,7 @@ impl HopWindowExecutor {
)
.boxed();
let window_end_offset = window_slide.checked_mul_int(i + units).ok_or_else(|| {
RwError::from(ErrorCode::InternalError(format!(
BatchError::Internal(anyhow!(format!(
"window_slide {} cannot be multiplied by {}",
window_slide, i
)))
Expand Down Expand Up @@ -231,9 +232,10 @@ impl HopWindowExecutor {
let contains_window_end = output_indices.contains(&window_end_col_index);
if !contains_window_start && !contains_window_end {
// make sure that either window_start or window_end is in output indices.
return Err(RwError::from(ErrorCode::InternalError(
"neither window_start or window_end is in output_indices".to_string(),
)));
return Err(BatchError::Internal(anyhow!(
"neither window_start or window_end is in output_indices"
))
.into());
}
#[for_await]
for data_chunk in child.execute() {
Expand Down
14 changes: 5 additions & 9 deletions src/batch/src/executor/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@

use std::iter::once;

use anyhow::anyhow;
use futures::future::try_join_all;
use futures_async_stream::try_stream;
use risingwave_common::array::column::Column;
use risingwave_common::array::{
ArrayBuilder, DataChunk, I64ArrayBuilder, Op, PrimitiveArrayBuilder, StreamChunk,
};
use risingwave_common::catalog::{Field, Schema, TableId};
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::DataType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_source::SourceManagerRef;

use crate::error::BatchError;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, Executor, ExecutorBuilder,
};
use crate::task::BatchTaskContext;

/// [`InsertExecutor`] implements table insertion with values from its child executor.
pub struct InsertExecutor {
/// Target table id.
Expand Down Expand Up @@ -107,11 +107,7 @@ impl InsertExecutor {
// Wait for all chunks to be taken / written.
let rows_inserted = try_join_all(notifiers)
.await
.map_err(|_| {
RwError::from(ErrorCode::InternalError(
"failed to wait chunks to be written".to_owned(),
))
})?
.map_err(|_| BatchError::Internal(anyhow!("failed to wait chunks to be written")))?
.into_iter()
.sum::<usize>();

Expand Down Expand Up @@ -147,7 +143,7 @@ impl BoxedExecutorBuilder for InsertExecutor {
source
.context()
.source_manager_ref()
.ok_or_else(|| InternalError("Source manager not found".to_string()))?,
.ok_or_else(|| BatchError::Internal(anyhow!("Source manager not found")))?,
inputs.remove(0),
)))
}
Expand Down
21 changes: 10 additions & 11 deletions src/batch/src/executor/join/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ use risingwave_common::array::column::Column;
use risingwave_common::array::data_chunk_iter::RowRef;
use risingwave_common::array::{Array, DataChunk, Row, Vis};
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::error::{Result, RwError};
use risingwave_common::types::{DataType, DatumRef};
use risingwave_common::util::chunk_coalesce::{DataChunkBuilder, SlicedDataChunk};
use risingwave_expr::expr::{build_from_prost as expr_build_from_prost, BoxedExpression};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::BatchError;
use crate::executor::join::chunked_data::RowId;
use crate::executor::join::row_level_iter::RowLevelIter;
use crate::executor::join::JoinType;
Expand Down Expand Up @@ -252,10 +253,10 @@ impl BoxedExecutorBuilder for NestedLoopJoinExecutor {
"NestedLoopJoinExecutor2".to_string(),
)))
}
_ => Err(ErrorCode::NotImplemented(
format!("Do not support {:?} join type now.", join_type),
None.into(),
)
_ => Err(BatchError::UnsupportedFunction(format!(
"Do not support {:?} join type now.",
join_type
))
.into()),
}
}
Expand Down Expand Up @@ -319,9 +320,8 @@ impl NestedLoopJoinExecutor {
JoinType::RightOuter => self.do_right_outer_join(),
JoinType::RightSemi => self.do_right_semi_join(),
JoinType::RightAnti => self.do_right_anti_join(),
_ => Err(ErrorCode::NotImplemented(
_ => Err(BatchError::UnsupportedFunction(
"Do not support other join types!".to_string(),
None.into(),
)
.into()),
}?;
Expand All @@ -340,6 +340,7 @@ impl NestedLoopJoinExecutor {
let (mut left_data_chunk, return_data_chunk) = self
.chunk_builder
.append_chunk(SlicedDataChunk::new_checked(ret_chunk)?)?;

// Have checked last chunk is None in before. Now swap to buffer it.
std::mem::swap(&mut self.last_chunk, &mut left_data_chunk);
if let Some(inner_chunk) = return_data_chunk {
Expand All @@ -364,9 +365,8 @@ impl NestedLoopJoinExecutor {
match self.join_type {
JoinType::RightOuter => self.do_probe_remaining_right_outer(),
JoinType::RightAnti => self.do_probe_remaining_right_anti(),
_ => Err(ErrorCode::NotImplemented(
_ => Err(BatchError::UnsupportedFunction(
"unsupported type for probe_remaining".to_string(),
None.into(),
)
.into()),
}
Expand Down Expand Up @@ -585,10 +585,9 @@ impl NestedLoopJoinExecutor {
(Vis::Compact(_), _) => right.vis().clone(),
(_, Vis::Compact(_)) => left.vis().clone(),
(Vis::Bitmap(_), Vis::Bitmap(_)) => {
return Err(ErrorCode::NotImplemented(
return Err(BatchError::UnsupportedFunction(
"The concatenate behaviour of two chunk with visibility is undefined"
.to_string(),
None.into(),
)
.into())
}
Expand Down
11 changes: 6 additions & 5 deletions src/batch/src/executor/join/sort_merge_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ use std::cmp::Ordering;
use futures_async_stream::try_stream;
use risingwave_common::array::{DataChunk, Row, RowRef};
use risingwave_common::catalog::Schema;
use risingwave_common::error::{ErrorCode, RwError};
use risingwave_common::error::RwError;
use risingwave_common::types::to_datum_ref;
use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::plan_common::OrderType as OrderTypeProst;

use crate::error::BatchError;
use crate::executor::join::row_level_iter::RowLevelIter;
use crate::executor::join::JoinType;
use crate::executor::{
Expand Down Expand Up @@ -306,10 +307,10 @@ impl BoxedExecutorBuilder for SortMergeJoinExecutor {
"SortMergeJoinExecutor2".to_string(),
)))
}
_ => Err(ErrorCode::NotImplemented(
format!("Do not support {:?} join type now.", join_type),
None.into(),
)
_ => Err(BatchError::UnsupportedFunction(format!(
"Do not support {:?} join type now.",
join_type
))
.into()),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/batch/src/executor/merge_sort_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,6 @@ mod tests {
assert_eq!(col0.array().as_int32().value_at(5), Some(3));
}
let res = stream.next().await;
assert_eq!(res, None);
assert!(res.is_none());
}
}
5 changes: 2 additions & 3 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::anyhow;
mod delete;
mod expand;
mod filter;
Expand Down Expand Up @@ -52,7 +52,6 @@ pub use order_by::*;
pub use project::*;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::Schema;
use risingwave_common::error::ErrorCode::InternalError;
use risingwave_common::error::Result;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::PlanNode;
Expand Down Expand Up @@ -151,7 +150,7 @@ impl<'a, C: Clone> ExecutorBuilder<'a, C> {
impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
pub async fn build(&self) -> Result<BoxedExecutor> {
self.try_build().await.map_err(|e| {
InternalError(format!(
anyhow!(format!(
"[PlanNode: {:?}] Failed to build executor: {}",
self.plan_node.get_node_body(),
e,
Expand Down
Loading

0 comments on commit 67dbe53

Please sign in to comment.