Skip to content

Commit

Permalink
refactor(meta): eliminate usages of RwError (#4364)
Browse files Browse the repository at this point in the history
* add MetaError

* fix unit tests

* cargo fmt

* add MetaError::permission_denied

* cargo clippy

* use tonic::Status::permission_denied

* simplify code

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
HuaHuaY and mergify[bot] authored Aug 3, 2022
1 parent ba66d65 commit 38173fc
Show file tree
Hide file tree
Showing 37 changed files with 613 additions and 660 deletions.
2 changes: 1 addition & 1 deletion src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

let distribution = match &seq_scan_node.vnode_bitmap {
Some(vnodes) => Distribution {
vnodes: Bitmap::try_from(vnodes).unwrap().into(),
vnodes: Bitmap::from(vnodes).into(),
dist_key_indices,
},
// This is possbile for dml. vnode_bitmap is not filled by scheduler.
Expand Down
10 changes: 5 additions & 5 deletions src/common/src/array/column_proto_readers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub fn read_numeric_array<T: PrimitiveArrayItemType, R: PrimitiveValueReader<T>>
);

let mut builder = PrimitiveArrayBuilder::<T>::new(cardinality);
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut cursor = Cursor::new(buf);
for not_null in bitmap.iter() {
if not_null {
Expand All @@ -69,8 +69,8 @@ pub fn read_bool_array(array: &ProstArray, cardinality: usize) -> ArrayResult<Ar
"Must have only 1 buffer in a bool array"
);

let data = (&array.get_values()[0]).try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let data = (&array.get_values()[0]).into();
let bitmap: Bitmap = array.get_null_bitmap()?.into();

let arr = BoolArray::new(bitmap, data);
assert_eq!(arr.len(), cardinality);
Expand Down Expand Up @@ -127,7 +127,7 @@ macro_rules! read_one_value_array {
let buf = array.get_values()[0].get_body().as_slice();

let mut builder = $builder::new(cardinality);
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut cursor = Cursor::new(buf);
for not_null in bitmap.iter() {
if not_null {
Expand Down Expand Up @@ -170,7 +170,7 @@ pub fn read_string_array<B: ArrayBuilder, R: VarSizedValueReader<B>>(
let data_buf = array.get_values()[1].get_body().as_slice();

let mut builder = B::with_meta(cardinality, ArrayMeta::Simple);
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let mut offset_cursor = Cursor::new(offset_buff);
let mut data_cursor = Cursor::new(data_buf);
let mut prev_offset: i64 = -1;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ListArray {
array.values.is_empty(),
"Must have no buffer in a list array"
);
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let cardinality = bitmap.len();
let array_data = array.get_list_array_data()?.to_owned();
let value = ArrayImpl::from_protobuf(array_data.value.as_ref().unwrap(), cardinality)?;
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/array/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl StructArray {
array.values.is_empty(),
"Must have no buffer in a struct array"
);
let bitmap: Bitmap = array.get_null_bitmap()?.try_into()?;
let bitmap: Bitmap = array.get_null_bitmap()?.into();
let cardinality = bitmap.len();
let array_data = array.get_struct_array_data()?;
let children = array_data
Expand Down
9 changes: 3 additions & 6 deletions src/common/src/buffer/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use itertools::Itertools;
use risingwave_pb::common::buffer::CompressionType;
use risingwave_pb::common::Buffer as ProstBuffer;

use crate::array::error::ArrayError;
use crate::array::ArrayResult;
use crate::util::bit_util;

Expand Down Expand Up @@ -329,15 +328,13 @@ impl Bitmap {
}
}

impl TryFrom<&ProstBuffer> for Bitmap {
type Error = ArrayError;

fn try_from(buf: &ProstBuffer) -> ArrayResult<Bitmap> {
impl From<&ProstBuffer> for Bitmap {
fn from(buf: &ProstBuffer) -> Self {
let last_byte_num_bits = u8::from_be_bytes(buf.body[..1].try_into().unwrap());
let bits = Bytes::copy_from_slice(&buf.body[1..]); // TODO: avoid this allocation
let num_bits = (bits.len() << 3) - ((8 - last_byte_num_bits) % 8) as usize;

Ok(Self::from_bytes_with_num_bits(bits, num_bits))
Self::from_bytes_with_num_bits(bits, num_bits)
}
}

Expand Down
6 changes: 0 additions & 6 deletions src/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,6 @@ impl From<JoinError> for RwError {
}
}

impl From<prost::DecodeError> for RwError {
fn from(prost_error: prost::DecodeError) -> Self {
ErrorCode::ProstError(prost_error).into()
}
}

impl From<MemComparableError> for RwError {
fn from(mem_comparable_error: MemComparableError) -> Self {
ErrorCode::MemComparableError(mem_comparable_error).into()
Expand Down
8 changes: 4 additions & 4 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use std::sync::Arc;

use futures::future::try_join_all;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{Result, RwError};
use risingwave_common::util::epoch::Epoch;
use risingwave_connector::source::SplitImpl;
use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
Expand All @@ -39,6 +38,7 @@ use crate::barrier::CommandChanges;
use crate::model::{ActorId, DispatcherId, FragmentId, TableFragments};
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;
use crate::{MetaError, MetaResult};

/// [`Reschedule`] is for the [`Command::RescheduleFragment`], which is used for rescheduling actors
/// in some fragment, like scaling or migrating.
Expand Down Expand Up @@ -188,7 +188,7 @@ where
S: MetaStore,
{
/// Generate a mutation for the given command.
pub async fn to_mutation(&self) -> Result<Option<Mutation>> {
pub async fn to_mutation(&self) -> MetaResult<Option<Mutation>> {
let mutation = match &self.command {
Command::Plain(mutation) => mutation.clone(),

Expand Down Expand Up @@ -322,7 +322,7 @@ where
}

/// Do some stuffs after barriers are collected, for the given command.
pub async fn post_collect(&self) -> Result<()> {
pub async fn post_collect(&self) -> MetaResult<()> {
match &self.command {
Command::Plain(_) => {}

Expand All @@ -341,7 +341,7 @@ where
};
client.drop_actors(request).await?;

Ok::<_, RwError>(())
Ok::<_, MetaError>(())
}
});

Expand Down
41 changes: 19 additions & 22 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ use std::mem::take;
use std::sync::Arc;
use std::time::{Duration, Instant};

use anyhow::anyhow;
use fail::fail_point;
use futures::future::try_join_all;
use itertools::Itertools;
use log::debug;
use prometheus::HistogramTimer;
use risingwave_common::bail;
use risingwave_common::catalog::TableId;
use risingwave_common::error::{ErrorCode, Result, RwError};
use risingwave_common::util::epoch::{Epoch, INVALID_EPOCH};
use risingwave_hummock_sdk::{HummockSstableId, LocalSstableInfo};
use risingwave_pb::common::worker_node::State::Running;
Expand Down Expand Up @@ -54,6 +55,7 @@ use crate::model::{ActorId, BarrierManagerState};
use crate::rpc::metrics::MetaMetrics;
use crate::storage::MetaStore;
use crate::stream::FragmentManagerRef;
use crate::{MetaError, MetaResult};

mod command;
mod info;
Expand Down Expand Up @@ -159,9 +161,7 @@ impl ScheduledBarriers {
let mut buffer = self.buffer.write().await;
while let Some((_, notifiers)) = buffer.pop_front() {
notifiers.into_iter().for_each(|notify| {
notify.notify_collection_failed(RwError::from(ErrorCode::InternalError(
"Scheduled barrier abort.".to_string(),
)))
notify.notify_collection_failed(anyhow!("Scheduled barrier abort.").into())
})
}
}
Expand Down Expand Up @@ -348,7 +348,7 @@ where
fn complete(
&mut self,
prev_epoch: u64,
result: Result<Vec<BarrierCompleteResponse>>,
result: MetaResult<Vec<BarrierCompleteResponse>>,
) -> Vec<EpochNode<S>> {
// change state to complete, and wait for nodes with the smaller epoch to commit
let wait_commit_timer = self.metrics.barrier_wait_commit_latency.start_timer();
Expand Down Expand Up @@ -445,13 +445,12 @@ pub struct EpochNode<S: MetaStore> {
}

/// The state of barrier.
#[derive(PartialEq)]
enum BarrierEpochState {
/// This barrier is current in-flight on the stream graph of compute nodes.
InFlight,

/// This barrier is completed or failed.
Completed(Result<Vec<BarrierCompleteResponse>>),
Completed(MetaResult<Vec<BarrierCompleteResponse>>),
}

impl<S> GlobalBarrierManager<S>
Expand Down Expand Up @@ -494,7 +493,7 @@ where
}

/// Flush means waiting for the next barrier to collect.
pub async fn flush(&self) -> Result<()> {
pub async fn flush(&self) -> MetaResult<()> {
let start = Instant::now();

debug!("start barrier flush");
Expand Down Expand Up @@ -623,7 +622,7 @@ where
async fn inject_and_send_err(
&self,
command_context: Arc<CommandContext<S>>,
barrier_complete_tx: UnboundedSender<(u64, Result<Vec<BarrierCompleteResponse>>)>,
barrier_complete_tx: UnboundedSender<(u64, MetaResult<Vec<BarrierCompleteResponse>>)>,
) {
let result = self
.inject_barrier(command_context.clone(), barrier_complete_tx.clone())
Expand All @@ -640,11 +639,9 @@ where
async fn inject_barrier(
&self,
command_context: Arc<CommandContext<S>>,
barrier_complete_tx: UnboundedSender<(u64, Result<Vec<BarrierCompleteResponse>>)>,
) -> Result<()> {
fail_point!("inject_barrier_err", |_| Err(RwError::from(
ErrorCode::InternalError("inject_barrier_err".to_string(),)
)));
barrier_complete_tx: UnboundedSender<(u64, MetaResult<Vec<BarrierCompleteResponse>>)>,
) -> MetaResult<()> {
fail_point!("inject_barrier_err", |_| bail!("inject_barrier_err"));
let mutation = command_context.to_mutation().await?;
let info = command_context.info.clone();
let mut node_need_collect = HashMap::new();
Expand Down Expand Up @@ -688,7 +685,7 @@ where
.inject_barrier(request)
.await
.map(tonic::Response::<_>::into_inner)
.map_err(RwError::from)
.map_err(MetaError::from)
}
.into()
}
Expand Down Expand Up @@ -720,7 +717,7 @@ where
.barrier_complete(request)
.await
.map(tonic::Response::<_>::into_inner)
.map_err(RwError::from)
.map_err(MetaError::from)
}
.into()
}
Expand All @@ -737,7 +734,7 @@ where
async fn barrier_complete_and_commit(
&self,
prev_epoch: u64,
result: Result<Vec<BarrierCompleteResponse>>,
result: MetaResult<Vec<BarrierCompleteResponse>>,
state: &mut BarrierManagerState,
tracker: &mut CreateMviewProgressTracker,
checkpoint_control: &mut CheckpointControl<S>,
Expand Down Expand Up @@ -798,7 +795,7 @@ where
&self,
node: &mut EpochNode<S>,
tracker: &mut CreateMviewProgressTracker,
) -> Result<()> {
) -> MetaResult<()> {
let prev_epoch = node.command_ctx.prev_epoch.0;

match &node.state {
Expand Down Expand Up @@ -889,9 +886,9 @@ where

/// Run multiple commands and return when they're all completely finished. It's ensured that
/// multiple commands is executed continuously and atomically.
pub async fn run_multiple_commands(&self, commands: Vec<Command>) -> Result<()> {
pub async fn run_multiple_commands(&self, commands: Vec<Command>) -> MetaResult<()> {
struct Context {
collect_rx: Receiver<Result<()>>,
collect_rx: Receiver<MetaResult<()>>,
finish_rx: Receiver<()>,
is_create_mv: bool,
}
Expand Down Expand Up @@ -946,13 +943,13 @@ where
}

/// Run a command and return when it's completely finished.
pub async fn run_command(&self, command: Command) -> Result<()> {
pub async fn run_command(&self, command: Command) -> MetaResult<()> {
self.run_multiple_commands(vec![command]).await
}

/// Wait for the next barrier to collect. Note that the barrier flowing in our stream graph is
/// ignored, if exists.
pub async fn wait_for_next_barrier_to_collect(&self) -> Result<()> {
pub async fn wait_for_next_barrier_to_collect(&self) -> MetaResult<()> {
let (tx, rx) = oneshot::channel();
let notifier = Notifier {
collected: Some(tx),
Expand Down
7 changes: 4 additions & 3 deletions src/meta/src/barrier/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::{Result, RwError};
use tokio::sync::oneshot;

use crate::{MetaError, MetaResult};

/// Used for notifying the status of a scheduled command/barrier.
#[derive(Debug, Default)]
pub(super) struct Notifier {
/// Get notified when scheduled barrier is about to send.
pub to_send: Option<oneshot::Sender<()>>,

/// Get notified when scheduled barrier is collected or failed.
pub collected: Option<oneshot::Sender<Result<()>>>,
pub collected: Option<oneshot::Sender<MetaResult<()>>>,

/// Get notified when scheduled barrier is finished.
pub finished: Option<oneshot::Sender<()>>,
Expand All @@ -44,7 +45,7 @@ impl Notifier {
}

/// Notify when we failed to collect a barrier. This function consumes `self`.
pub fn notify_collection_failed(self, err: RwError) {
pub fn notify_collection_failed(self, err: MetaError) {
if let Some(tx) = self.collected {
tx.send(Err(err)).ok();
}
Expand Down
Loading

0 comments on commit 38173fc

Please sign in to comment.