From b5f389b5f1735ec5aa5c1c9b5492cc87ffe60f6b Mon Sep 17 00:00:00 2001 From: BingqingLyu Date: Tue, 14 Nov 2023 11:59:02 +0800 Subject: [PATCH] replace `ok_or` by `ok_or_else` for lazy evaluation --- .../assembly/v6d/src/bin/gaia_executor.rs | 12 +-- .../executor/common/dyn_type/src/object.rs | 8 +- .../executor/common/dyn_type/src/serde.rs | 73 ++++++++++------- .../rust/client/src/physical_builder.rs | 12 +-- .../executor/ir/common/src/utils.rs | 30 +++---- .../ir/core/src/glogue/extend_step.rs | 4 +- .../executor/ir/core/src/glogue/pattern.rs | 47 ++++++----- .../executor/ir/core/src/plan/logical.rs | 25 +++--- .../executor/ir/core/src/plan/patmat.rs | 2 +- .../executor/ir/core/src/plan/physical.rs | 61 +++++++------- .../src/adapters/gs_store/partitioner.rs | 20 +++-- .../ir/graph_proxy/src/apis/cluster_info.rs | 8 +- .../src/apis/graph/element/path.rs | 2 +- .../ir/graph_proxy/src/utils/expr/eval.rs | 43 ++++++---- .../graph_proxy/src/utils/expr/eval_pred.rs | 8 +- .../executor/ir/runtime/src/assembly.rs | 81 +++++++++---------- .../src/process/operator/accum/accum.rs | 8 +- .../src/process/operator/accum/sample.rs | 9 +-- .../src/process/operator/filter/coin.rs | 9 +-- .../process/operator/flatmap/edge_expand.rs | 4 +- .../src/process/operator/flatmap/unfold.rs | 28 ++++--- .../src/process/operator/group/group.rs | 6 +- .../process/operator/map/expand_intersect.rs | 20 ++--- .../runtime/src/process/operator/map/get_v.rs | 39 +++++---- .../src/process/operator/map/path_end.rs | 10 ++- .../src/process/operator/map/path_start.rs | 8 +- .../src/process/operator/map/project.rs | 26 +++--- .../ir/runtime/src/process/operator/mod.rs | 8 +- .../runtime/src/process/operator/sink/mod.rs | 2 +- .../runtime/src/process/operator/sink/sink.rs | 9 ++- .../process/operator/sink/sink_vineyard.rs | 71 ++++++++-------- .../runtime/src/process/operator/sort/sort.rs | 2 +- .../ir/runtime/src/process/operator/source.rs | 2 +- .../src/process/operator/subtask/apply.rs | 16 ++-- .../executor/store/exp_store/src/config.rs | 4 +- .../executor/store/exp_store/src/ldbc.rs | 24 +++--- 36 files changed, 381 insertions(+), 360 deletions(-) diff --git a/interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs b/interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs index 1f5b3a9cff4e..3fe78c3b1537 100644 --- a/interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs +++ b/interactive_engine/executor/assembly/v6d/src/bin/gaia_executor.rs @@ -43,28 +43,28 @@ async fn main() -> Result<(), Box> { let config_map: HashMap<_, _> = parsed.into_iter().collect(); let rpc_port: u16 = config_map .get("rpc.port") - .ok_or(StartServerError::empty_config_error("rpc.port"))? + .ok_or_else(|| StartServerError::empty_config_error("rpc.port"))? .parse()?; let server_id: u64 = config_map .get("server.id") - .ok_or(StartServerError::empty_config_error("server.id"))? + .ok_or_else(|| StartServerError::empty_config_error("server.id"))? .parse()?; let server_size: usize = config_map .get("server.size") - .ok_or(StartServerError::empty_config_error("server.size"))? + .ok_or_else(|| StartServerError::empty_config_error("server.size"))? .parse()?; let hosts: Vec<&str> = config_map .get("network.servers") - .ok_or(StartServerError::empty_config_error("network.servers"))? + .ok_or_else(|| StartServerError::empty_config_error("network.servers"))? .split(",") .collect(); let worker_thread_num: i32 = config_map .get("pegasus.worker.num") - .ok_or(StartServerError::empty_config_error("pegasus.worker.num"))? + .ok_or_else(|| StartServerError::empty_config_error("pegasus.worker.num"))? .parse()?; let vineyard_graph_id: i64 = config_map .get("graph.vineyard.object.id") - .ok_or(StartServerError::empty_config_error("graph.vineyard.object.id"))? + .ok_or_else(|| StartServerError::empty_config_error("graph.vineyard.object.id"))? .parse()?; assert_eq!(server_size, hosts.len()); diff --git a/interactive_engine/executor/common/dyn_type/src/object.rs b/interactive_engine/executor/common/dyn_type/src/object.rs index b15db7c25ef0..917409d25153 100644 --- a/interactive_engine/executor/common/dyn_type/src/object.rs +++ b/interactive_engine/executor/common/dyn_type/src/object.rs @@ -441,7 +441,7 @@ impl DateTimeFormats { pub fn from_date32(date32: i32) -> Result { NaiveDate::from_ymd_opt(date32 / 10000, ((date32 % 10000) / 100) as u32, (date32 % 100) as u32) .map(|d| DateTimeFormats::Date(d)) - .ok_or(CastError::new::(RawType::Integer)) + .ok_or_else(|| CastError::new::(RawType::Integer)) } // the time32 is stored HHMMSSsss, e.g., 121314100 @@ -453,13 +453,13 @@ impl DateTimeFormats { (time32 % 1000) as u32, ) .map(|t| DateTimeFormats::Time(t)) - .ok_or(CastError::new::(RawType::Integer)) + .ok_or_else(|| CastError::new::(RawType::Integer)) } pub fn from_timestamp_millis(timestamp: i64) -> Result { NaiveDateTime::from_timestamp_millis(timestamp) .map(|dt| DateTimeFormats::DateTime(dt)) - .ok_or(CastError::new::(RawType::Long)) + .ok_or_else(|| CastError::new::(RawType::Long)) } // we pre-assume some date/time/datetime formats according to ISO formats. @@ -522,7 +522,7 @@ impl DateTimeFormats { DateTimeFormats::DateTime(dt) => dt .and_local_timezone(FixedOffset::east_opt(0).unwrap()) .single() - .ok_or(CastError::new::(RawType::DateTimeWithTz)), + .ok_or_else(|| CastError::new::(RawType::DateTimeWithTz)), DateTimeFormats::DateTimeWithTz(dt) => Ok(*dt), } } diff --git a/interactive_engine/executor/common/dyn_type/src/serde.rs b/interactive_engine/executor/common/dyn_type/src/serde.rs index c396246bccaf..c820eb59a81e 100644 --- a/interactive_engine/executor/common/dyn_type/src/serde.rs +++ b/interactive_engine/executor/common/dyn_type/src/serde.rs @@ -133,12 +133,13 @@ impl Decode for DateTimeFormats { let year = ::read_from(reader)?; let month = ::read_from(reader)?; let day = ::read_from(reader)?; - let date = chrono::NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32).ok_or( - io::Error::new( - io::ErrorKind::Other, - format!("invalid date {:?}-{:?}-{:?}", year, month, day), - ), - )?; + let date = chrono::NaiveDate::from_ymd_opt(year as i32, month as u32, day as u32) + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid date {:?}-{:?}-{:?}", year, month, day), + ) + })?; Ok(DateTimeFormats::Date(date)) } 1 => { @@ -148,41 +149,55 @@ impl Decode for DateTimeFormats { let nano = ::read_from(reader)?; let time = chrono::NaiveTime::from_hms_nano_opt(hour as u32, minute as u32, second as u32, nano) - .ok_or(io::Error::new( - io::ErrorKind::Other, - format!("invalid time {:?}:{:?}:{:?}.{:?}", hour, minute, second, nano / 1000_000), - ))?; + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!( + "invalid time {:?}:{:?}:{:?}.{:?}", + hour, + minute, + second, + nano / 1000_000 + ), + ) + })?; Ok(DateTimeFormats::Time(time)) } 2 => { let timestamp_millis = ::read_from(reader)?; - let date_time = chrono::NaiveDateTime::from_timestamp_millis(timestamp_millis).ok_or( - io::Error::new( - io::ErrorKind::Other, - format!("invalid datetime {:?}", timestamp_millis), - ), - )?; + let date_time = + chrono::NaiveDateTime::from_timestamp_millis(timestamp_millis).ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid datetime {:?}", timestamp_millis), + ) + })?; Ok(DateTimeFormats::DateTime(date_time)) } 3 => { let native_local_timestamp_millis = ::read_from(reader)?; let offset = ::read_from(reader)?; - let tz = chrono::FixedOffset::east_opt(offset) - .ok_or(io::Error::new(io::ErrorKind::Other, format!("invalid offset {:?}", offset)))?; + let tz = chrono::FixedOffset::east_opt(offset).ok_or_else(|| { + io::Error::new(io::ErrorKind::Other, format!("invalid offset {:?}", offset)) + })?; let date_time = chrono::NaiveDateTime::from_timestamp_millis(native_local_timestamp_millis) - .ok_or(io::Error::new( - io::ErrorKind::Other, - format!("invalid datetime {:?}", native_local_timestamp_millis), - ))? + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!("invalid datetime {:?}", native_local_timestamp_millis), + ) + })? .and_local_timezone(tz) .single() - .ok_or(io::Error::new( - io::ErrorKind::Other, - format!( - "invalid datetime with timezone {:?} {:?}", - native_local_timestamp_millis, tz - ), - ))?; + .ok_or_else(|| { + io::Error::new( + io::ErrorKind::Other, + format!( + "invalid datetime with timezone {:?} {:?}", + native_local_timestamp_millis, tz + ), + ) + })?; Ok(DateTimeFormats::DateTimeWithTz(date_time)) } diff --git a/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs b/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs index 426a563962c4..771df129776d 100644 --- a/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs +++ b/interactive_engine/executor/ir/clients/rust/client/src/physical_builder.rs @@ -35,7 +35,7 @@ pub struct PlanBuilder { impl Default for PlanBuilder { fn default() -> Self { - PlanBuilder {id: DEFAULT_PLAN_ID, plan: vec![] } + PlanBuilder { id: DEFAULT_PLAN_ID, plan: vec![] } } } @@ -165,7 +165,7 @@ impl PlanBuilder { let apply = pb::Apply { join_kind: unsafe { ::std::mem::transmute(join_kind) }, keys: vec![], - sub_plan: Some(pb::PhysicalPlan { plan: sub_plan.take(), plan_id: DEFAULT_PLAN_ID }), + sub_plan: Some(pb::PhysicalPlan { plan: sub_plan.take(), plan_id: DEFAULT_PLAN_ID }), alias, }; let op = pb::physical_opr::operator::OpKind::Apply(apply); @@ -221,7 +221,7 @@ impl PlanBuilder { left_keys, right_keys, join_kind: unsafe { ::std::mem::transmute(join_kind) }, - left_plan: Some(pb::PhysicalPlan { plan: left_plan.take() , plan_id: DEFAULT_PLAN_ID}), + left_plan: Some(pb::PhysicalPlan { plan: left_plan.take(), plan_id: DEFAULT_PLAN_ID }), right_plan: Some(pb::PhysicalPlan { plan: right_plan.take(), plan_id: DEFAULT_PLAN_ID }), }; let op = pb::physical_opr::operator::OpKind::Join(join); @@ -247,7 +247,7 @@ impl PlanBuilder { pub fn union(&mut self, mut plans: Vec) -> &mut Self { let mut sub_plans = vec![]; for plan in plans.drain(..) { - sub_plans.push(pb::PhysicalPlan { plan: plan.take() , plan_id: DEFAULT_PLAN_ID}); + sub_plans.push(pb::PhysicalPlan { plan: plan.take(), plan_id: DEFAULT_PLAN_ID }); } let union = pb::Union { sub_plans }; let op = pb::physical_opr::operator::OpKind::Union(union); @@ -259,7 +259,7 @@ impl PlanBuilder { let key = key.try_into().unwrap(); let mut sub_plans = vec![]; for plan in plans.drain(..) { - sub_plans.push(pb::PhysicalPlan { plan: plan.take() , plan_id: DEFAULT_PLAN_ID}); + sub_plans.push(pb::PhysicalPlan { plan: plan.take(), plan_id: DEFAULT_PLAN_ID }); } let intersect = pb::Intersect { sub_plans, key }; let op = pb::physical_opr::operator::OpKind::Intersect(intersect); @@ -331,7 +331,7 @@ impl PlanBuilder { } pub fn build(self) -> pb::PhysicalPlan { - pb::PhysicalPlan { plan: self.plan , plan_id: self.id} + pb::PhysicalPlan { plan: self.plan, plan_id: self.id } } } diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index 90373ed21536..4401478fe8c0 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -394,14 +394,16 @@ impl TryFrom for Vec { let predicate = and_predicate .predicates .get(0) - .ok_or(ParsePbError::EmptyFieldError("`AndCondition` is emtpy".to_string()))?; + .ok_or_else(|| ParsePbError::EmptyFieldError("`AndCondition` is emtpy".to_string()))?; let (key, value) = (predicate.key.as_ref(), predicate.value.as_ref()); - let key = key.ok_or("key is empty in kv_pair in indexed_scan")?; + let key = key.ok_or_else(|| { + ParsePbError::EmptyFieldError("key is empty in kv_pair in indexed_scan".to_string()) + })?; if let Some(common_pb::property::Item::Id(_id_key)) = key.item.as_ref() { - let value_item = value.ok_or(ParsePbError::EmptyFieldError( - "`Value` is empty in kv_pair in indexed_scan".to_string(), - ))?; + let value_item = value.ok_or_else(|| { + ParsePbError::EmptyFieldError("`Value` is empty in kv_pair in indexed_scan".to_string()) + })?; match value_item { pb::index_predicate::triplet::Value::Const(value) => match value.item.as_ref() { @@ -440,14 +442,12 @@ impl TryFrom for Vec> { // PkValue can be one-column or multi-columns, which is a set of and_conditions. let mut primary_key_value = Vec::with_capacity(and_predicates.predicates.len()); for predicate in &and_predicates.predicates { - let key_pb = predicate - .key - .clone() - .ok_or("key is empty in kv_pair in indexed_scan")?; - let value_pb = predicate - .value - .clone() - .ok_or("value is empty in kv_pair in indexed_scan")?; + let key_pb = predicate.key.clone().ok_or_else(|| { + ParsePbError::EmptyFieldError("key is empty in kv_pair in indexed_scan".to_string()) + })?; + let value_pb = predicate.value.clone().ok_or_else(|| { + ParsePbError::EmptyFieldError("value is empty in kv_pair in indexed_scan".to_string()) + })?; let key = match key_pb.item { Some(common_pb::property::Item::Key(prop_key)) => prop_key.try_into()?, _ => Err(ParsePbError::Unsupported( @@ -909,9 +909,9 @@ impl TryFrom for physical_pb::physical_opr::operator:: fn try_from(op: PhysicalOpr) -> Result { let op_kind = op .opr - .ok_or(ParsePbError::EmptyFieldError("algebra op is empty".to_string()))? + .ok_or_else(|| ParsePbError::EmptyFieldError("algebra op is empty".to_string()))? .op_kind - .ok_or(ParsePbError::EmptyFieldError("algebra op_kind is empty".to_string()))?; + .ok_or_else(|| ParsePbError::EmptyFieldError("algebra op_kind is empty".to_string()))?; Ok(op_kind) } } diff --git a/interactive_engine/executor/ir/core/src/glogue/extend_step.rs b/interactive_engine/executor/ir/core/src/glogue/extend_step.rs index ab1c5e449270..90a876e991ec 100644 --- a/interactive_engine/executor/ir/core/src/glogue/extend_step.rs +++ b/interactive_engine/executor/ir/core/src/glogue/extend_step.rs @@ -69,10 +69,10 @@ impl ExactExtendEdge { path_opr .base .as_mut() - .ok_or(ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))? + .ok_or_else(|| ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))? .edge_expand .as_mut() - .ok_or(ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))? + .ok_or_else(|| ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))? .direction = self.dir as i32; Ok(path_opr.into()) } diff --git a/interactive_engine/executor/ir/core/src/glogue/pattern.rs b/interactive_engine/executor/ir/core/src/glogue/pattern.rs index d8935af4c11c..3197ce86fa36 100644 --- a/interactive_engine/executor/ir/core/src/glogue/pattern.rs +++ b/interactive_engine/executor/ir/core/src/glogue/pattern.rs @@ -334,12 +334,9 @@ impl Pattern { return Err(IrPatternError::Unsupported(format!("Only support join_kind of `InnerJoin` in ExtendStrategy in Pattern Match, while the join_kind is {:?}", sentence.join_kind)).into()); } // pb pattern sentence must have start tag - let start_tag = pb_name_or_id_to_id( - sentence - .start - .as_ref() - .ok_or(ParsePbError::EmptyFieldError("pb::Pattern::Sentence::start".to_string()))?, - )?; + let start_tag = pb_name_or_id_to_id(sentence.start.as_ref().ok_or_else(|| { + ParsePbError::EmptyFieldError("pb::Pattern::Sentence::start".to_string()) + })?)?; // just use the start tag id as its pattern vertex id let start_tag_v_id = start_tag as PatternId; // check whether the start tag label is already determined or not @@ -568,11 +565,11 @@ fn build_logical_plan( origin_pattern: &Pattern, mut exact_extend_steps: Vec, ) -> IrPatternResult { let mut match_plan = pb::LogicalPlan::default(); - let source_extend = exact_extend_steps - .pop() - .ok_or(IrPatternError::InvalidExtendPattern( + let source_extend = exact_extend_steps.pop().ok_or_else(|| { + IrPatternError::InvalidExtendPattern( "Build logical plan error: from empty extend steps!".to_string(), - ))?; + ) + })?; append_opr(&mut match_plan, generate_source_operator(origin_pattern, &source_extend)?)?; for exact_extend_step in exact_extend_steps.into_iter().rev() { let edge_expands_num = exact_extend_step.len(); @@ -693,7 +690,7 @@ fn generate_project_operator(pattern: &Pattern) -> IrPatternResult= max_tag_id { let mut mappings = Vec::with_capacity(max_tag_id as usize); @@ -893,11 +890,13 @@ fn get_edge_expand_from_binder<'a, 'b>( let expand_base = path_expand .base .as_ref() - .ok_or(ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))?; + .ok_or_else(|| (ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string())))?; let edge_expand = expand_base .edge_expand .as_ref() - .ok_or(ParsePbError::EmptyFieldError("PathExpand::base::EdgeExpand in Pattern".to_string()))?; + .ok_or_else(|| { + ParsePbError::EmptyFieldError("PathExpand::base::EdgeExpand in Pattern".to_string()) + })?; edge_data_map.insert(edge_id, PbEdgeOrPath::from(path_expand.clone())); Ok(edge_expand) } else if let Some(BinderItem::Edge(edge_expand)) = binder.item.as_ref() { @@ -1177,7 +1176,7 @@ impl Pattern { pub fn get_edge(&self, edge_id: PatternId) -> IrPatternResult<&PatternEdge> { self.edges .get(edge_id) - .ok_or(IrPatternError::MissingPatternEdge(edge_id)) + .ok_or_else(|| (IrPatternError::MissingPatternEdge(edge_id))) } /// Get the total number of edges in the pattern @@ -1209,7 +1208,7 @@ impl Pattern { pub fn get_edge_tag(&self, edge_id: PatternId) -> IrPatternResult> { self.edges_data .get(edge_id) - .ok_or(IrPatternError::MissingPatternEdge(edge_id)) + .ok_or_else(|| (IrPatternError::MissingPatternEdge(edge_id))) .map(|edge_data| edge_data.tag) } @@ -1218,7 +1217,7 @@ impl Pattern { pub fn get_edge_data(&self, edge_id: PatternId) -> IrPatternResult<&PbEdgeOrPath> { self.edges_data .get(edge_id) - .ok_or(IrPatternError::MissingPatternEdge(edge_id)) + .ok_or_else(|| (IrPatternError::MissingPatternEdge(edge_id))) .map(|edge_data| &edge_data.data) } @@ -1236,7 +1235,7 @@ impl Pattern { pub fn get_vertex(&self, vertex_id: PatternId) -> IrPatternResult<&PatternVertex> { self.vertices .get(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id)) + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id))) } /// Get the total number of vertices in the pattern @@ -1266,7 +1265,7 @@ impl Pattern { pub fn get_vertex_tag(&self, vertex_id: PatternId) -> IrPatternResult> { self.vertices_data .get(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id)) + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id))) .map(|vertex_data| vertex_data.tag) } @@ -1275,7 +1274,7 @@ impl Pattern { pub fn get_vertex_parameters(&self, vertex_id: PatternId) -> IrPatternResult> { self.vertices_data .get(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id)) + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id))) .map(|vertex_data| vertex_data.parameters.as_ref()) } @@ -1285,7 +1284,7 @@ impl Pattern { let vertex_data = self .vertices_data .get(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id))?; + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id)))?; Ok(vertex_data .adjacencies .iter() @@ -1298,7 +1297,7 @@ impl Pattern { pub fn get_vertex_degree(&self, vertex_id: PatternId) -> IrPatternResult { self.vertices_data .get(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id)) + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id))) .map(|vertex_data| vertex_data.adjacencies.len()) } @@ -1356,7 +1355,7 @@ impl Pattern { let edge_data = self .edges_data .get_mut(edge_id) - .ok_or(IrPatternError::MissingPatternEdge(edge_id))?; + .ok_or_else(|| (IrPatternError::MissingPatternEdge(edge_id)))?; edge_data.data = opr; Ok(()) } @@ -1373,7 +1372,7 @@ impl Pattern { let mut vertex_data = self .vertices_data .remove(vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(vertex_id))?; + .ok_or_else(|| (IrPatternError::MissingPatternVertex(vertex_id)))?; if let Some(existed_params) = vertex_data.parameters { vertex_data.parameters = Some(combine_query_params(existed_params, params)); } else { @@ -1410,7 +1409,7 @@ impl Pattern { // update adjacent vertices' info self.vertices_data .get_mut(adjacent_vertex_id) - .ok_or(IrPatternError::MissingPatternVertex(adjacent_vertex_id))? + .ok_or_else(|| (IrPatternError::MissingPatternVertex(adjacent_vertex_id)))? .remove_adjacency(adjacent_edge_id) } diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index dd07996c155a..d5b061bfb799 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -449,7 +449,7 @@ impl LogicalPlan { id_map .get(&old) .cloned() - .ok_or(IrError::ParentNodeNotExist(old)) + .ok_or_else(|| IrError::ParentNodeNotExist(old)) }) .collect::>>()? }; @@ -473,7 +473,7 @@ impl LogicalPlan { let inner_opr = opr .opr .as_ref() - .ok_or(IrError::MissingData("Operator::opr".to_string()))?; + .ok_or_else(|| IrError::MissingData("Operator::opr".to_string()))?; let is_sink = if let pb::logical_plan::operator::Opr::Sink(_) = inner_opr { true } else { false }; @@ -508,7 +508,7 @@ impl LogicalPlan { let is_pattern_source_whole_graph = self .get_opr(parent_ids[0]) .map(|pattern_source| is_whole_graph(&pattern_source)) - .ok_or(IrError::ParentNodeNotExist(parent_ids[0]))?; + .ok_or_else(|| IrError::ParentNodeNotExist(parent_ids[0]))?; let extend_strategy = if is_pattern_source_whole_graph { ExtendStrategy::init(&pattern, &self.meta) } else { @@ -1028,7 +1028,7 @@ fn preprocess_label( let new_item = common_pb::value::Item::I32( schema .get_table_id(name) - .ok_or(IrError::TableNotExist(NameOrId::Str(name.to_string())))?, + .ok_or_else(|| IrError::TableNotExist(NameOrId::Str(name.to_string())))?, ); debug!("table: {:?} -> {:?}", item, new_item); *item = new_item; @@ -1039,9 +1039,9 @@ fn preprocess_label( .item .iter() .map(|name| { - schema - .get_table_id(name) - .ok_or(IrError::TableNotExist(NameOrId::Str(name.to_string()))) + schema.get_table_id(name).ok_or_else(|| { + IrError::TableNotExist(NameOrId::Str(name.to_string())) + }) }) .collect::>>()?, }); @@ -1123,11 +1123,12 @@ fn preprocess_params( if let Some(schema) = &meta.schema { if schema.is_table_id() { for table in params.tables.iter_mut() { - let new_table = get_table_id_from_pb(schema, table) - .ok_or(IrError::TableNotExist(table.clone().try_into()?))? - .into(); - debug!("table: {:?} -> {:?}", table, new_table); - *table = new_table; + if let Some(new_table) = get_table_id_from_pb(schema, table) { + debug!("table: {:?} -> {:?}", table, new_table); + *table = new_table.into(); + } else { + return Err(IrError::TableNotExist(table.clone().try_into()?)); + } } } } diff --git a/interactive_engine/executor/ir/core/src/plan/patmat.rs b/interactive_engine/executor/ir/core/src/plan/patmat.rs index 4354d40d6cd5..b138da82911f 100644 --- a/interactive_engine/executor/ir/core/src/plan/patmat.rs +++ b/interactive_engine/executor/ir/core/src/plan/patmat.rs @@ -118,7 +118,7 @@ impl TryFrom for BaseSentence { let start_tag: NameOrId = pb .start .clone() - .ok_or(ParsePbError::EmptyFieldError("Pattern::Sentence::start".to_string()))? + .ok_or_else(|| ParsePbError::EmptyFieldError("Pattern::Sentence::start".to_string()))? .try_into()?; let end_tag: Option = pb diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index 0ee8e42f844d..f55e451114af 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -232,7 +232,7 @@ impl AsPhysical for pb::PathExpand { let range = self .hop_range .as_ref() - .ok_or(IrError::MissingData("PathExpand::hop_range".to_string()))?; + .ok_or_else(|| IrError::MissingData("PathExpand::hop_range".to_string()))?; if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 { Err(IrError::InvalidRange(range.lower, range.upper))? } @@ -336,12 +336,12 @@ impl AsPhysical for pb::PathExpand { let expand_base = self .base .as_mut() - .ok_or(IrError::MissingData("PathExpand::base".to_string()))?; + .ok_or_else(|| IrError::MissingData("PathExpand::base".to_string()))?; let getv = expand_base.get_v.as_mut(); let edge_expand = expand_base .edge_expand .as_mut() - .ok_or(IrError::MissingData("PathExpand::base.edge_expand".to_string()))?; + .ok_or_else(|| IrError::MissingData("PathExpand::base.edge_expand".to_string()))?; match result_opt { pb::path_expand::ResultOpt::EndV => { // do nothing @@ -461,10 +461,10 @@ fn build_and_try_fuse_get_v(builder: &mut PlanBuilder, mut get_v: pb::GetV) -> I let op_kind = last_op .opr .as_mut() - .ok_or(IrError::MissingData(format!("PhysicalOpr")))? + .ok_or_else(|| IrError::MissingData(format!("PhysicalOpr")))? .op_kind .as_mut() - .ok_or(IrError::MissingData(format!("PhysicalOpr OpKind")))?; + .ok_or_else(|| IrError::MissingData(format!("PhysicalOpr OpKind")))?; if let physical_pb::physical_opr::operator::OpKind::Edge(ref mut edge) = op_kind { if edge.alias.is_none() { // outE + inV || inE + outV || bothE + otherV @@ -551,7 +551,7 @@ impl AsPhysical for pb::Limit { let range = self .range .as_ref() - .ok_or(IrError::MissingData("Limit::range".to_string()))?; + .ok_or_else(|| IrError::MissingData("Limit::range".to_string()))?; if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 { Err(IrError::InvalidRange(range.lower, range.upper))? } @@ -619,7 +619,7 @@ impl AsPhysical for pb::Sample { let sample_type = sample_type .inner .as_ref() - .ok_or(IrError::MissingData("Sample::sample_type".to_string()))?; + .ok_or_else(|| IrError::MissingData("Sample::sample_type".to_string()))?; match sample_type { pb::sample::sample_type::Inner::SampleByNum(num) => { if num.num <= 0 { @@ -650,11 +650,11 @@ impl AsPhysical for pb::Sink { let target = self .sink_target .as_ref() - .ok_or(IrError::MissingData("Sink::sink_target".to_string()))?; + .ok_or_else(|| IrError::MissingData("Sink::sink_target".to_string()))?; match target .inner .as_ref() - .ok_or(IrError::MissingData("Sink::sink_target::Inner".to_string()))? + .ok_or_else(|| IrError::MissingData("Sink::sink_target::Inner".to_string()))? { pb::sink::sink_target::Inner::SinkDefault(_) => { let tag_id_mapping = plan_meta @@ -876,7 +876,7 @@ impl AsPhysical for LogicalPlan { } else if curr_node.borrow().children.len() >= 2 { let (merge_node, subplans) = self .get_branch_plans(curr_node.clone()) - .ok_or(IrError::MissingData("Branch::merge_node and subplans".to_string()))?; + .ok_or_else(|| IrError::MissingData("Branch::merge_node and subplans".to_string()))?; let mut plans: Vec = vec![]; for subplan in &subplans { let mut sub_bldr = PlanBuilder::default(); @@ -964,7 +964,7 @@ fn add_intersect_job_builder( let intersect_tag = intersect_opr .key .as_ref() - .ok_or(IrError::ParsePbError("Empty tag in `Intersect` opr".into()))?; + .ok_or_else(|| IrError::ParsePbError("Empty tag in `Intersect` opr".into()))?; let mut auxilia: Option = None; let mut intersect_plans: Vec = vec![]; for subplan in subplans { @@ -978,12 +978,12 @@ fn add_intersect_job_builder( )))? } let mut sub_bldr = PlanBuilder::default(); - let first_opr = subplan - .get_first_node() - .ok_or(IrError::InvalidPattern("First node missing for Intersection's subplan".to_string()))?; - let last_opr = subplan - .get_last_node() - .ok_or(IrError::InvalidPattern("Last node Missing for Intersection's subplan".to_string()))?; + let first_opr = subplan.get_first_node().ok_or_else(|| { + IrError::InvalidPattern("First node missing for Intersection's subplan".to_string()) + })?; + let last_opr = subplan.get_last_node().ok_or_else(|| { + IrError::InvalidPattern("Last node Missing for Intersection's subplan".to_string()) + })?; if let Some(Vertex(get_v)) = last_opr.borrow().opr.opr.as_ref() { let mut get_v = get_v.clone(); if get_v.alias.is_none() || !get_v.alias.as_ref().unwrap().eq(intersect_tag) { @@ -1003,18 +1003,18 @@ fn add_intersect_job_builder( // TODO: there might be a bug here: // if path_expand has an alias which indicates that the path would be referred later, it may not as expected. let mut path_expand = path_expand.clone(); - let path_expand_base = path_expand - .base - .as_ref() - .ok_or(ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()))?; + let path_expand_base = path_expand.base.as_ref().ok_or_else(|| { + ParsePbError::EmptyFieldError("PathExpand::base in Pattern".to_string()) + })?; let path_get_v_opt = path_expand_base.get_v.clone(); - let base_edge_expand = - path_expand_base - .edge_expand - .as_ref() - .ok_or(ParsePbError::EmptyFieldError( + let base_edge_expand = path_expand_base + .edge_expand + .as_ref() + .ok_or_else(|| { + ParsePbError::EmptyFieldError( "PathExpand::base::edge_expand in Pattern".to_string(), - ))?; + ) + })?; // Ensure the base is ExpandV or ExpandE + GetV if path_get_v_opt == None && base_edge_expand.expand_opt == pb::edge_expand::ExpandOpt::Edge as i32 @@ -1033,10 +1033,9 @@ fn add_intersect_job_builder( // pick the last edge expand out from the path expand let mut last_edge_expand = base_edge_expand.clone(); last_edge_expand.v_tag = None; - let hop_range = path_expand - .hop_range - .as_mut() - .ok_or(ParsePbError::EmptyFieldError("pb::PathExpand::hop_range".to_string()))?; + let hop_range = path_expand.hop_range.as_mut().ok_or_else(|| { + ParsePbError::EmptyFieldError("pb::PathExpand::hop_range".to_string()) + })?; if hop_range.lower < 1 { Err(IrError::Unsupported(format!( "PathExpand in Intersection with lower range of {:?}", diff --git a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs index 62ddb2e2d544..65edd3d62da2 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/adapters/gs_store/partitioner.rs @@ -42,10 +42,12 @@ impl PartitionInfo for GrootMultiPartition { fn get_server_id(&self, partition_id: PartitionId) -> GraphProxyResult { self.graph_partition_manager .get_server_id(partition_id) - .ok_or(GraphProxyError::query_store_error(&format!( - "get server id failed on Groot with partition_id of {:?}", - partition_id - ))) + .ok_or_else(|| { + GraphProxyError::query_store_error(&format!( + "get server id failed on Groot with partition_id of {:?}", + partition_id + )) + }) } } @@ -91,9 +93,11 @@ impl PartitionInfo for VineyardMultiPartition { self.partition_server_index_mapping .get(&partition_id) .cloned() - .ok_or(GraphProxyError::query_store_error(&format!( - "get server id failed on Vineyard with partition_id of {:?}", - partition_id - ))) + .ok_or_else(|| { + GraphProxyError::query_store_error(&format!( + "get server id failed on Vineyard with partition_id of {:?}", + partition_id + )) + }) } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/cluster_info.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/cluster_info.rs index 00b5a773fc38..25bdf89bb584 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/cluster_info.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/cluster_info.rs @@ -35,27 +35,27 @@ impl ClusterInfo for PegasusClusterInfo { pegasus::get_current_worker_checked() .as_ref() .map(|info| info.servers) - .ok_or(GraphProxyError::cluster_info_missing("server number")) + .ok_or_else(|| (GraphProxyError::cluster_info_missing("server number"))) } fn get_server_index(&self) -> GraphProxyResult { pegasus::get_current_worker_checked() .as_ref() .map(|info| info.server_index) - .ok_or(GraphProxyError::cluster_info_missing("server index")) + .ok_or_else(|| (GraphProxyError::cluster_info_missing("server index"))) } fn get_local_worker_num(&self) -> GraphProxyResult { pegasus::get_current_worker_checked() .as_ref() .map(|info| info.local_peers) - .ok_or(GraphProxyError::cluster_info_missing("local worker number")) + .ok_or_else(|| (GraphProxyError::cluster_info_missing("local worker number"))) } fn get_worker_index(&self) -> GraphProxyResult { pegasus::get_current_worker_checked() .as_ref() .map(|info| info.index) - .ok_or(GraphProxyError::cluster_info_missing("worker index")) + .ok_or_else(|| (GraphProxyError::cluster_info_missing("worker index"))) } } diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs index 298dd430c6a2..5c800a8a0746 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/element/path.rs @@ -429,7 +429,7 @@ impl TryFrom for VertexOrEdge { fn try_from(e: result_pb::graph_path::VertexOrEdge) -> Result { let vertex_or_edge = e .inner - .ok_or(ParsePbError::EmptyFieldError("empty field of VertexOrEdge".to_string()))?; + .ok_or_else(|| (ParsePbError::EmptyFieldError("empty field of VertexOrEdge".to_string())))?; match vertex_or_edge { result_pb::graph_path::vertex_or_edge::Inner::Vertex(v) => { let vertex = v.try_into()?; diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs index 4373095b9405..fc51092203af 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval.rs @@ -178,37 +178,42 @@ pub(crate) fn apply_function<'a>( Interval::Year => Ok(a .as_date_format()? .year() - .ok_or(ExprEvalError::GetNoneFromContext)? + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? .into()), Interval::Month => Ok((a .as_date_format()? .month() - .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? + as i32) .into()), Interval::Day => Ok((a .as_date_format()? .day() - .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? + as i32) .into()), Interval::Hour => Ok((a .as_date_format()? .hour() - .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? + as i32) .into()), Interval::Minute => Ok((a .as_date_format()? .minute() - .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? + as i32) .into()), Interval::Second => Ok((a .as_date_format()? .second() - .ok_or(ExprEvalError::GetNoneFromContext)? as i32) + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? + as i32) .into()), Interval::Millisecond => Ok((a .as_date_format()? .millisecond() - .ok_or(ExprEvalError::GetNoneFromContext)? + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? as i32) .into()), }, @@ -605,13 +610,13 @@ impl Evaluate for Operand { } else { let graph_element = element .as_graph_element() - .ok_or(ExprEvalError::UnexpectedDataType(self.into()))?; + .ok_or_else(|| ExprEvalError::UnexpectedDataType(self.into()))?; match property { PropKey::Id => graph_element.id().into(), PropKey::Label => graph_element .label() .map(|label| label.into()) - .ok_or(ExprEvalError::GetNoneFromContext)?, + .ok_or_else(|| ExprEvalError::GetNoneFromContext)?, PropKey::Len => unreachable!(), PropKey::All => graph_element .get_all_properties() @@ -627,23 +632,27 @@ impl Evaluate for Operand { .collect::>() .into() }) - .ok_or(ExprEvalError::GetNoneFromContext)?, + .ok_or_else(|| ExprEvalError::GetNoneFromContext)?, PropKey::Key(key) => graph_element .get_property(key) - .ok_or(ExprEvalError::GetNoneFromContext)? + .ok_or_else(|| ExprEvalError::GetNoneFromContext)? .try_to_owned() - .ok_or(ExprEvalError::OtherErr( - "cannot get `Object` from `BorrowObject`".to_string(), - ))?, + .ok_or_else(|| { + ExprEvalError::OtherErr( + "cannot get `Object` from `BorrowObject`".to_string(), + ) + })?, } } } else { element .as_borrow_object() .try_to_owned() - .ok_or(ExprEvalError::OtherErr( - "cannot get `Object` from `BorrowObject`".to_string(), - ))? + .ok_or_else(|| { + ExprEvalError::OtherErr( + "cannot get `Object` from `BorrowObject`".to_string(), + ) + })? }; Ok(result) diff --git a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs index eef8467ad05f..76d072cde0bf 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/utils/expr/eval_pred.rs @@ -189,7 +189,7 @@ impl TryFrom for Predicates { }; Option::::from(partial) - .ok_or(ParsePbError::ParseError("invalid `Triplet` in `IndexPredicate`".to_string())) + .ok_or_else(|| (ParsePbError::ParseError("invalid `Triplet` in `IndexPredicate`".to_string()))) } } @@ -209,7 +209,9 @@ impl TryFrom for Predicates { } } - predicates.ok_or(ParsePbError::ParseError("invalid `AndPredicate` in `IndexPredicate`".to_string())) + predicates.ok_or_else(|| { + (ParsePbError::ParseError("invalid `AndPredicate` in `IndexPredicate`".to_string())) + }) } } @@ -229,7 +231,7 @@ impl TryFrom for Predicates { } } - predicates.ok_or(ParsePbError::ParseError("invalid `IndexPredicate`".to_string())) + predicates.ok_or_else(|| (ParsePbError::ParseError("invalid `IndexPredicate`".to_string()))) } } diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 51f5a415f4bc..5090646ed877 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -195,12 +195,11 @@ impl IRJobAssembly { let op_kind = op.try_into().map_err(|e| FnGenError::from(e))?; match op_kind { OpKind::Repartition(repartition) => { - let repartition_strategy = repartition - .strategy - .as_ref() - .ok_or(FnGenError::from(ParsePbError::EmptyFieldError( + let repartition_strategy = repartition.strategy.as_ref().ok_or_else(|| { + FnGenError::from(ParsePbError::EmptyFieldError( "Empty repartition strategy".to_string(), - )))?; + )) + })?; match repartition_strategy { pb::repartition::Strategy::ToAnother(shuffle) => { let router = self.udf_gen.gen_shuffle(shuffle)?; @@ -222,11 +221,9 @@ impl IRJobAssembly { stream = stream.flat_map_with_name("Unfold", move |input| func.exec(input))?; } OpKind::Limit(limit) => { - let range = limit - .range - .ok_or(FnGenError::from(ParsePbError::EmptyFieldError( - "pb::Limit::range".to_string(), - )))?; + let range = limit.range.ok_or_else(|| { + FnGenError::from(ParsePbError::EmptyFieldError("pb::Limit::range".to_string())) + })?; // e.g., `limit(10)` would be translate as `Range{lower=0, upper=10}` if range.upper <= range.lower || range.lower != 0 { Err(FnGenError::from(ParsePbError::ParseError(format!( @@ -323,10 +320,9 @@ impl IRJobAssembly { let apply_gen = self.udf_gen.gen_apply(apply.clone())?; let join_kind = apply_gen.get_join_kind(); let join_func = apply_gen.gen_left_join_func()?; - let sub_task = apply - .sub_plan - .as_ref() - .ok_or(BuildJobError::Unsupported("Task is missing in Apply".to_string()))?; + let sub_task = apply.sub_plan.as_ref().ok_or_else(|| { + BuildJobError::Unsupported("Task is missing in Apply".to_string()) + })?; stream = match join_kind { JoinKind::Semi => stream .apply(|sub_start| { @@ -386,11 +382,11 @@ impl IRJobAssembly { let left_task = join .left_plan .as_ref() - .ok_or("left_task is missing in merge")?; + .ok_or_else(|| FnGenError::ParseError("left_task is missing in merge".into()))?; let right_task = join .right_plan .as_ref() - .ok_or("right_task is missing in merge")?; + .ok_or_else(|| FnGenError::ParseError("right_task is missing in merge".into()))?; let (left_stream, right_stream) = stream.copied()?; let left_stream = self .install(left_stream, &left_task.plan[..])? @@ -406,9 +402,11 @@ impl IRJobAssembly { left_stream .left_outer_join(right_stream)? .map(|(left, right)| { - let left = left.ok_or(FnExecError::unexpected_data_error( - "left is None in left outer join", - ))?; + let left = left.ok_or_else(|| { + FnExecError::unexpected_data_error( + "left is None in left outer join", + ) + })?; if let Some(right) = right { // TODO(bingqing): Specify HeadJoinOpt if necessary Ok(left.value.join(right.value, None)) @@ -420,9 +418,9 @@ impl IRJobAssembly { JoinKind::RightOuter => left_stream .right_outer_join(right_stream)? .map(|(left, right)| { - let right = right.ok_or(FnExecError::unexpected_data_error( - "right is None in right outer join", - ))?; + let right = right.ok_or_else(|| { + FnExecError::unexpected_data_error("right is None in right outer join") + })?; if let Some(left) = left { Ok(left.value.join(right.value, None)) } else { @@ -468,13 +466,11 @@ impl IRJobAssembly { subplan, )))? } - let last_op = - subplan - .plan - .pop() - .ok_or(FnGenError::from(ParsePbError::EmptyFieldError( - "subplan in pb::Intersect::plan".to_string(), - )))?; + let last_op = subplan.plan.pop().ok_or_else(|| { + FnGenError::from(ParsePbError::EmptyFieldError( + "subplan in pb::Intersect::plan".to_string(), + )) + })?; let last_op_kind = last_op .try_into() .map_err(|e| FnGenError::from(e))?; @@ -536,18 +532,14 @@ impl IRJobAssembly { stream = stream.flat_map_with_name("EdgeExpand", move |input| func.exec(input))?; } OpKind::Path(path) => { - let mut base = - path.base - .clone() - .ok_or(FnGenError::from(ParsePbError::EmptyFieldError( - "pb::PathExpand::base".to_string(), - )))?; - let range = - path.hop_range - .as_ref() - .ok_or(FnGenError::from(ParsePbError::EmptyFieldError( - "pb::PathExpand::hop_range".to_string(), - )))?; + let mut base = path.base.clone().ok_or_else(|| { + FnGenError::from(ParsePbError::EmptyFieldError("pb::PathExpand::base".to_string())) + })?; + let range = path.hop_range.as_ref().ok_or_else(|| { + FnGenError::from(ParsePbError::EmptyFieldError( + "pb::PathExpand::hop_range".to_string(), + )) + })?; if range.upper <= range.lower || range.lower < 0 || range.upper <= 0 { Err(FnGenError::from(ParsePbError::ParseError(format!( "range {:?} in PathExpand Operator", @@ -710,10 +702,9 @@ impl JobAssembly for IRJobAssembly stream diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/accum/accum.rs b/interactive_engine/executor/ir/runtime/src/process/operator/accum/accum.rs index 6a30d8e76268..d7c1f1fe2287 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/accum/accum.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/accum/accum.rs @@ -87,7 +87,9 @@ impl Accumulator for EntryAccumulator { EntryAccumulator::ToSum(sum) => { let primitive = next .as_object() - .ok_or(FnExecError::unexpected_data_error("DynEntry is not a object type `Sum`"))? + .ok_or_else(|| { + FnExecError::unexpected_data_error("DynEntry is not a object type `Sum`") + })? .as_primitive() .map_err(|e| { FnExecError::unexpected_data_error(&format!( @@ -100,7 +102,9 @@ impl Accumulator for EntryAccumulator { EntryAccumulator::ToAvg(sum, count) => { let primitive = next .as_object() - .ok_or(FnExecError::unexpected_data_error("DynEntry is not a object type `ToAvg`"))? + .ok_or_else(|| { + FnExecError::unexpected_data_error("DynEntry is not a object type `ToAvg`") + })? .as_primitive() .map_err(|e| { FnExecError::unexpected_data_error(&format!( diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/accum/sample.rs b/interactive_engine/executor/ir/runtime/src/process/operator/accum/sample.rs index ea7421b6eb9d..366b22c709ff 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/accum/sample.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/accum/sample.rs @@ -60,12 +60,9 @@ impl Accumulator> for SampleAccum { impl SampleAccumFactoryGen for algebra_pb::Sample { fn gen_accum(self) -> FnGenResult { if let Some(sample_type) = self.sample_type { - let sample_type = - sample_type - .inner - .ok_or(FnGenError::ParseError(ParsePbError::EmptyFieldError( - "sample_type.inner".to_owned(), - )))?; + let sample_type = sample_type.inner.ok_or_else(|| { + FnGenError::ParseError(ParsePbError::EmptyFieldError("sample_type.inner".to_owned())) + })?; match sample_type { algebra_pb::sample::sample_type::Inner::SampleByNum(num) => { let sample = SampleAccum { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/filter/coin.rs b/interactive_engine/executor/ir/runtime/src/process/operator/filter/coin.rs index 51487aaa842a..28a6ec74ba5d 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/filter/coin.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/filter/coin.rs @@ -49,12 +49,9 @@ impl FilterFunction for CoinOperator { impl FilterFuncGen for algebra_pb::Sample { fn gen_filter(self) -> FnGenResult>> { if let Some(sample_type) = self.sample_type { - let sample_type = - sample_type - .inner - .ok_or(FnGenError::ParseError(ParsePbError::EmptyFieldError( - "sample_type.inner".to_owned(), - )))?; + let sample_type = sample_type.inner.ok_or_else(|| { + FnGenError::ParseError(ParsePbError::EmptyFieldError("sample_type.inner".to_owned())) + })?; match sample_type { algebra_pb::sample::sample_type::Inner::SampleByRatio(ratio) => { if ratio.ratio < 0.0 || ratio.ratio > 1.0 { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs index 781bf4fc3281..2220e5b21cd3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/edge_expand.rs @@ -79,7 +79,7 @@ impl FlatMapFunction for EdgeExpandOperator< EntryType::Path => { let graph_path = entry .as_graph_path() - .ok_or(FnExecError::Unreachable)?; + .ok_or_else(|| FnExecError::Unreachable)?; let iter = self.stmt.exec(graph_path.get_path_end().id())?; let curr_path = graph_path.clone(); Ok(Box::new(RecordPathExpandIter::new(input, curr_path, iter))) @@ -99,7 +99,7 @@ impl FlatMapFuncGen for pb::EdgeExpand { fn gen_flat_map( self, ) -> FnGenResult>>> { - let graph = get_graph().ok_or(FnGenError::NullGraphError)?; + let graph = get_graph().ok_or_else(|| FnGenError::NullGraphError)?; let start_v_tag = self.v_tag; let edge_or_end_v_tag = self.alias; let direction_pb: pb::edge_expand::Direction = unsafe { ::std::mem::transmute(self.direction) }; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs index e47c5d6ce6e7..b7e73c355fb0 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/flatmap/unfold.rs @@ -40,10 +40,12 @@ impl FlatMapFunction for UnfoldOperator { fn exec(&self, mut input: Record) -> FnResult { let entry_type = input .get(self.tag) - .ok_or(FnExecError::get_tag_error(&format!( - "get tag {:?} from record in `Unfold` operator, the record is {:?}", - self.tag, input - )))? + .ok_or_else(|| { + FnExecError::get_tag_error(&format!( + "get tag {:?} from record in `Unfold` operator, the record is {:?}", + self.tag, input + )) + })? .get_type(); match entry_type { EntryType::Intersection => { @@ -54,9 +56,9 @@ impl FlatMapFunction for UnfoldOperator { let intersection = entry .as_any_ref() .downcast_ref::() - .ok_or(FnExecError::unexpected_data_error( - "downcast intersection entry in UnfoldOperator", - ))?; + .ok_or_else(|| { + FnExecError::unexpected_data_error("downcast intersection entry in UnfoldOperator") + })?; let mut res = Vec::with_capacity(intersection.len()); for item in intersection.iter().cloned() { let mut new_entry = input.clone(); @@ -70,9 +72,9 @@ impl FlatMapFunction for UnfoldOperator { let collection = entry .as_any_ref() .downcast_ref::() - .ok_or(FnExecError::unexpected_data_error( - "downcast collection entry in UnfoldOperator", - ))?; + .ok_or_else(|| { + FnExecError::unexpected_data_error("downcast collection entry in UnfoldOperator") + })?; let mut res = Vec::with_capacity(collection.len()); for item in collection.inner.iter().cloned() { let mut new_entry = input.clone(); @@ -83,9 +85,9 @@ impl FlatMapFunction for UnfoldOperator { } EntryType::Path => { let entry = input.get(self.tag).unwrap(); - let path = entry - .as_graph_path() - .ok_or(FnExecError::unexpected_data_error("downcast path entry in UnfoldOperatro"))?; + let path = entry.as_graph_path().ok_or_else(|| { + FnExecError::unexpected_data_error("downcast path entry in UnfoldOperatro") + })?; let path_vec = if let Some(path) = path.get_path() { path.clone() } else { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/group/group.rs b/interactive_engine/executor/ir/runtime/src/process/operator/group/group.rs index 0ee1b1e2d948..02298ca31220 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/group/group.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/group/group.rs @@ -36,9 +36,9 @@ impl GroupGen for pb::GroupBy { fn gen_group_map(&self) -> FnGenResult>> { let mut key_aliases = Vec::with_capacity(self.mappings.len()); for key_alias in self.mappings.iter() { - let alias = key_alias - .alias - .ok_or(ParsePbError::from(format!("key alias cannot be None in group opr {:?}", self)))?; + let alias = key_alias.alias.ok_or_else(|| { + ParsePbError::from(format!("key alias cannot be None in group opr {:?}", self)) + })?; key_aliases.push(alias); } let group_map = GroupMap { key_aliases }; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs index f8838476052f..b2dd3096398c 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/expand_intersect.rs @@ -148,12 +148,12 @@ impl Element for IntersectionEntry { impl FilterMapFunction for ExpandOrIntersect { fn exec(&self, mut input: Record) -> FnResult> { - let entry = input - .get(self.start_v_tag) - .ok_or(FnExecError::get_tag_error(&format!( + let entry = input.get(self.start_v_tag).ok_or_else(|| { + FnExecError::get_tag_error(&format!( "get start_v_tag {:?} from record in `ExpandOrIntersect` operator, the record is {:?}", self.start_v_tag, input - )))?; + )) + })?; match entry.get_type() { EntryType::Vertex => { let id = entry.id(); @@ -171,9 +171,11 @@ impl FilterMapFunction for ExpandOrIntersect let pre_intersection = pre_entry .as_any_mut() .downcast_mut::() - .ok_or(FnExecError::unexpected_data_error(&format!( - "entry is not a intersection in ExpandOrIntersect" - )))?; + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "entry is not a intersection in ExpandOrIntersect" + )) + })?; pre_intersection.intersect(iter); if pre_intersection.is_empty() { Ok(None) @@ -204,11 +206,11 @@ impl FilterMapFunction for ExpandOrIntersect impl FilterMapFuncGen for pb::EdgeExpand { fn gen_filter_map(self) -> FnGenResult>> { - let graph = graph_proxy::apis::get_graph().ok_or(FnGenError::NullGraphError)?; + let graph = graph_proxy::apis::get_graph().ok_or_else(|| FnGenError::NullGraphError)?; let start_v_tag = self.v_tag; let edge_or_end_v_tag = self .alias - .ok_or(ParsePbError::from("`EdgeExpand::alias` cannot be empty for intersection"))?; + .ok_or_else(|| ParsePbError::from("`EdgeExpand::alias` cannot be empty for intersection"))?; let direction_pb: pb::edge_expand::Direction = unsafe { ::std::mem::transmute(self.direction) }; let direction = Direction::from(direction_pb); let query_params: QueryParams = self.params.try_into()?; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 5f4c4da50516..3666aebd5711 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -79,21 +79,26 @@ impl FilterMapFunction for GetVertexOperator { VOpt::Other => { let graph_path = input .get_mut(self.start_tag) - .ok_or(FnExecError::unexpected_data_error(&format!( - "get_mut of GraphPath failed in {:?}", - self - )))? + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "get_mut of GraphPath failed in {:?}", + self + )) + })? .as_any_mut() .downcast_mut::() - .ok_or(FnExecError::unexpected_data_error(&format!( - "entry is not a path in GetV" - )))?; - let path_end_edge = graph_path.get_path_end().as_edge().ok_or( - FnExecError::unexpected_data_error(&format!( - "GetOtherVertex on a path entry with input: {:?}", - graph_path.get_path_end() - )), - )?; + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!("entry is not a path in GetV")) + })?; + let path_end_edge = graph_path + .get_path_end() + .as_edge() + .ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( + "GetOtherVertex on a path entry with input: {:?}", + graph_path.get_path_end() + )) + })?; let label = path_end_edge.get_other_label(); if self.contains_label(label)? { let vertex = Vertex::new( @@ -111,7 +116,7 @@ impl FilterMapFunction for GetVertexOperator { let path_end_vertex = graph_path .get_path_end() .as_vertex() - .ok_or(FnExecError::unsupported_error("Get end edge on a path entry"))? + .ok_or_else(|| FnExecError::unsupported_error("Get end edge on a path entry"))? .clone(); let label = path_end_vertex.label(); if self.contains_label(label.as_ref())? { @@ -167,7 +172,7 @@ impl FilterMapFunction for AuxiliaOperator { // 2. further fetch properties, e.g., filter by columns. match entry.get_type() { EntryType::Vertex => { - let graph = get_graph().ok_or(FnExecError::NullGraphError)?; + let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?; let id = entry.id(); if let Some(vertex) = graph .get_vertex(&[id], &self.query_params)? @@ -209,9 +214,9 @@ impl FilterMapFunction for AuxiliaOperator { // Auxilia for vertices in Path is for filtering. let graph_path = entry .as_graph_path() - .ok_or(FnExecError::Unreachable)?; + .ok_or_else(|| FnExecError::Unreachable)?; let path_end = graph_path.get_path_end(); - let graph = get_graph().ok_or(FnExecError::NullGraphError)?; + let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?; let id = path_end.id(); if graph .get_vertex(&[id], &self.query_params)? diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/path_end.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/path_end.rs index 337c80f1f2c7..62fae5cbdc27 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/path_end.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/path_end.rs @@ -31,10 +31,12 @@ impl MapFunction for PathEndOperator { if self.alias.is_some() { let entry = input .get(None) - .ok_or(FnExecError::get_tag_error(&format!( - "get None tag from the current record in `PathEnd` operator, the record is {:?}", - input - )))? + .ok_or_else(|| { + FnExecError::get_tag_error(&format!( + "get None tag from the current record in `PathEnd` operator, the record is {:?}", + input + )) + })? .clone(); input.append_arc_entry(entry.clone(), self.alias.clone()); } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/path_start.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/path_start.rs index 120a720b50d8..558d40023d76 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/path_start.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/path_start.rs @@ -35,12 +35,12 @@ struct PathStartOperator { impl FilterMapFunction for PathStartOperator { fn exec(&self, mut input: Record) -> FnResult> { if let Some(entry) = input.get(self.start_tag) { - let v = entry - .as_vertex() - .ok_or(FnExecError::unexpected_data_error(&format!( + let v = entry.as_vertex().ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( "tag {:?} does not refer to a graph vertex element in record {:?}", self.start_tag, input - )))?; + )) + })?; let graph_path = GraphPath::new(v.clone(), self.path_opt, self.result_opt); input.append(graph_path, None); Ok(Some(input)) diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs index 86da361f89a9..8e9a58c8e39a 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/project.rs @@ -138,7 +138,7 @@ impl FilterMapFuncGen for pb::Project { for expr_alias in self.mappings.into_iter() { let expr = expr_alias .expr - .ok_or(ParsePbError::from("expr eval is missing in project"))?; + .ok_or_else(|| ParsePbError::from("expr eval is missing in project"))?; let projector = if expr.operators.len() == 1 { match expr.operators.get(0).unwrap() { common_pb::ExprOpr { item: Some(common_pb::expr_opr::Item::Var(var)), .. } => { @@ -159,21 +159,13 @@ impl FilterMapFuncGen for pb::Project { common_pb::ExprOpr { item: Some(common_pb::expr_opr::Item::Map(key_vals)), .. } => { let mut key_value_vec = Vec::with_capacity(key_vals.key_vals.len()); for key_val in key_vals.key_vals.iter() { - let key = key_val - .key - .as_ref() - .ok_or(ParsePbError::EmptyFieldError(format!( - "key in Map Expr {:?}", - key_val - )))?; + let key = key_val.key.as_ref().ok_or_else(|| { + ParsePbError::EmptyFieldError(format!("key in Map Expr {:?}", key_val)) + })?; let key_obj = Object::try_from(key.clone())?; - let val = key_val - .value - .as_ref() - .ok_or(ParsePbError::EmptyFieldError(format!( - "value in Map Expr {:?}", - key_val - )))?; + let val = key_val.value.as_ref().ok_or_else(|| { + ParsePbError::EmptyFieldError(format!("value in Map Expr {:?}", key_val)) + })?; let tag_key = TagKey::try_from(val.clone())?; key_value_vec.push((Some(key_obj), tag_key)); } @@ -215,8 +207,8 @@ mod tests { use crate::process::operator::map::FilterMapFuncGen; use crate::process::operator::tests::{ init_source, init_source_with_multi_tags, init_source_with_tag, init_vertex1, init_vertex2, - to_expr_map_pb, to_expr_var_pb, to_expr_vars_pb, to_var_pb, PERSON_LABEL, TAG_A, TAG_B, TAG_C, TAG_D, TAG_E, TAG_F, - TAG_G, + to_expr_map_pb, to_expr_var_pb, to_expr_vars_pb, to_var_pb, PERSON_LABEL, TAG_A, TAG_B, TAG_C, + TAG_D, TAG_E, TAG_F, TAG_G, }; use crate::process::record::Record; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs index 3ad50234d444..00db86cf7cd9 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/mod.rs @@ -92,11 +92,9 @@ impl TagKey { } PropKey::Key(key) => { if let Some(properties) = element.get_property(key) { - properties - .try_to_owned() - .ok_or(FnExecError::unexpected_data_error( - "unable to own the `BorrowObject`", - ))? + properties.try_to_owned().ok_or_else(|| { + FnExecError::unexpected_data_error("unable to own the `BorrowObject`") + })? } else { Object::None } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs index 605b51834cc3..b13cbe09d09f 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/mod.rs @@ -52,7 +52,7 @@ impl SinkGen for pb::Sink { if let Some(sink_target) = self.sink_target { let inner = sink_target .inner - .ok_or(ParsePbError::EmptyFieldError("sink_target inner is missing".to_string()))?; + .ok_or_else(|| ParsePbError::EmptyFieldError("sink_target inner is missing".to_string()))?; let tags = self .tags .into_iter() diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs index 756d2c68c128..5c570c48f172 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink.rs @@ -28,7 +28,7 @@ use pegasus::api::function::{FnResult, MapFunction}; use pegasus_common::downcast::AsAny; use prost::Message; -use crate::error::{FnGenResult, FnExecResult, FnExecError}; +use crate::error::{FnExecError, FnExecResult, FnGenResult}; use crate::process::entry::{CollectionEntry, DynEntry, Entry, EntryType, PairEntry}; use crate::process::operator::map::IntersectionEntry; use crate::process::operator::sink::{SinkGen, Sinker}; @@ -144,8 +144,11 @@ impl RecordSinkEncoder { let val_pb = self.element_to_pb(pair.get_right()); key_values.push(result_pb::key_values::KeyValue { key: Some(key_pb), value: Some(val_pb) }) } else { - Err(FnExecError::unsupported_error(&format!("only support map result with object key, while it is {:?}", pair.get_left())))? - } + Err(FnExecError::unsupported_error(&format!( + "only support map result with object key, while it is {:?}", + pair.get_left() + )))? + } } Ok(result_pb::KeyValues { key_values }) } diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink_vineyard.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink_vineyard.rs index 49d66812afb3..43a07e962b8c 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink_vineyard.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sink/sink_vineyard.rs @@ -36,24 +36,21 @@ pub struct GraphSinkEncoder { impl Accumulator for GraphSinkEncoder { fn accum(&mut self, mut next: Record) -> FnExecResult<()> { - let graph = get_graph().ok_or(FnExecError::NullGraphError)?; + let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?; for sink_key in &self.sink_keys { - let entry = next - .take(sink_key.as_ref()) - .ok_or(FnExecError::get_tag_error(&format!( - "tag {:?} in GraphWriter on {:?}", - sink_key, next - )))?; + let entry = next.take(sink_key.as_ref()).ok_or_else(|| { + FnExecError::get_tag_error(&format!("tag {:?} in GraphWriter on {:?}", sink_key, next)) + })?; if let Some(v) = entry.as_vertex() { - let vertex_pk = graph - .get_primary_key(&v.id())? - .ok_or(GraphProxyError::query_store_error("get_primary_key() returns empty pk"))?; - let label = v - .label() - .ok_or(FnExecError::unexpected_data_error(&format!( + let vertex_pk = graph.get_primary_key(&v.id())?.ok_or_else(|| { + GraphProxyError::query_store_error("get_primary_key() returns empty pk") + })?; + let label = v.label().ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( "label of vertex {:?} is None in sink_vineyard", v.id() - )))?; + )) + })?; loop { if let Ok(mut graph_writer_guard) = self.graph_writer.try_lock() { graph_writer_guard.add_vertex(label.clone(), vertex_pk, v.get_details().clone())?; @@ -61,36 +58,38 @@ impl Accumulator for GraphSinkEncoder { } } } else if let Some(e) = entry.as_edge() { - let src_vertex_pk = - graph - .get_primary_key(&e.src_id)? - .ok_or(GraphProxyError::query_store_error( + let src_vertex_pk = graph + .get_primary_key(&e.src_id)? + .ok_or_else(|| { + GraphProxyError::query_store_error( "get_primary_key() of src_vertex returns empty pk", - ))?; - let dst_vertex_pk = - graph - .get_primary_key(&e.dst_id)? - .ok_or(GraphProxyError::query_store_error( + ) + })?; + let dst_vertex_pk = graph + .get_primary_key(&e.dst_id)? + .ok_or_else(|| { + GraphProxyError::query_store_error( "get_primary_key() of src_vertex returns empty pk", - ))?; - let label = e - .label() - .ok_or(FnExecError::unexpected_data_error(&format!( + ) + })?; + let label = e.label().ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( "label of edge {:?} is None in sink_vineyard", e.id() - )))?; - let src_label = e - .get_src_label() - .ok_or(FnExecError::unexpected_data_error(&format!( + )) + })?; + let src_label = e.get_src_label().ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( "src_label of edge {:?} is None in sink_vineyard", e.id() - )))?; - let dst_label = e - .get_dst_label() - .ok_or(FnExecError::unexpected_data_error(&format!( + )) + })?; + let dst_label = e.get_dst_label().ok_or_else(|| { + FnExecError::unexpected_data_error(&format!( "dst_label of edge {:?} is None in sink_vineyard", e.id() - )))?; + )) + })?; loop { if let Ok(mut graph_writer_guard) = self.graph_writer.try_lock() { graph_writer_guard.add_edge( diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/sort/sort.rs b/interactive_engine/executor/ir/runtime/src/process/operator/sort/sort.rs index 55cf5f3fd9db..fb079d22b692 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/sort/sort.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/sort/sort.rs @@ -72,7 +72,7 @@ impl TryFrom for RecordCompare { for order_pair in order_pb.pairs { let key = order_pair .key - .ok_or(ParsePbError::EmptyFieldError("key is empty in order".to_string()))? + .ok_or_else(|| ParsePbError::EmptyFieldError("key is empty in order".to_string()))? .try_into()?; let order: Order = unsafe { ::std::mem::transmute(order_pair.order) }; tag_key_order.push((key, order)); diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 6d37ac326f2c..d54657032936 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -131,7 +131,7 @@ impl SourceOperator { impl SourceOperator { pub fn gen_source(self, worker_index: usize) -> FnGenResult + Send>> { - let graph = get_graph().ok_or(FnGenError::NullGraphError)?; + let graph = get_graph().ok_or_else(|| FnGenError::NullGraphError)?; match self.source_type { SourceType::Vertex => { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/subtask/apply.rs b/interactive_engine/executor/ir/runtime/src/process/operator/subtask/apply.rs index c4da2cd8fc92..d13cd6ad80fe 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/subtask/apply.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/subtask/apply.rs @@ -39,12 +39,12 @@ impl BinaryFunction, Option> for ApplyOperator { } else { let sub_result = sub.get(0).unwrap(); // We assume the result of sub_entry is always saved on head of Record for now. - let sub_entry = sub_result - .get(None) - .ok_or(FnExecError::get_tag_error(&format!( + let sub_entry = sub_result.get(None).ok_or_else(|| { + FnExecError::get_tag_error(&format!( "get None tag from the sub record in `Apply` operator, the record is {:?}", sub_result - )))?; + )) + })?; if let Some(alias) = self.alias.as_ref() { // append sub_entry without moving head let columns = parent.get_columns_mut(); @@ -68,12 +68,12 @@ impl BinaryFunction, Option> for ApplyOperator { } else { let sub_result = sub.get(0).unwrap(); // We assume the result of sub_entry is always saved on head of Record for now. - let sub_entry = sub_result - .get(None) - .ok_or(FnExecError::get_tag_error(&format!( + let sub_entry = sub_result.get(None).ok_or_else(|| { + FnExecError::get_tag_error(&format!( "get None tag from the sub record in `Apply` operator, the record is {:?}", sub_result - )))?; + )) + })?; if let Some(alias) = self.alias.as_ref() { let columns = parent.get_columns_mut(); columns.insert(*alias as usize, sub_entry.clone()); diff --git a/interactive_engine/executor/store/exp_store/src/config.rs b/interactive_engine/executor/store/exp_store/src/config.rs index 2642fcd4dd34..b6dad8bbedf2 100644 --- a/interactive_engine/executor/store/exp_store/src/config.rs +++ b/interactive_engine/executor/store/exp_store/src/config.rs @@ -195,9 +195,9 @@ impl GraphDBConfig { let curr_path = entry?.path(); let path_str = curr_path .file_name() - .ok_or(GDBError::UnknownError)? + .ok_or_else(|| GDBError::UnknownError)? .to_str() - .ok_or(GDBError::UnknownError)? + .ok_or_else(|| GDBError::UnknownError)? .to_string(); if !path_str.starts_with(PARTITION_PREFIX) { diff --git a/interactive_engine/executor/store/exp_store/src/ldbc.rs b/interactive_engine/executor/store/exp_store/src/ldbc.rs index 9bb60f5023e8..1e9bdc5c45b8 100644 --- a/interactive_engine/executor/store/exp_store/src/ldbc.rs +++ b/interactive_engine/executor/store/exp_store/src/ldbc.rs @@ -220,15 +220,15 @@ impl LDBCParser { .meta_data .vertex_map .get(vertex_type) - .ok_or(GDBError::InvalidTypeError(vertex_type.to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(vertex_type.to_string()))?; let id_index = vertex_meta .get_column_index(ID_FIELD) - .ok_or(GDBError::FieldNotExistError)?; + .ok_or_else(|| GDBError::FieldNotExistError)?; let label_index = vertex_meta.get_column_index(LABEL_FIELD); let vertex_type_id = *self .vertex_type_to_id .get(vertex_type) - .ok_or(GDBError::InvalidTypeError(vertex_type.to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(vertex_type.to_string()))?; Ok(LDBCVertexParser { vertex_type: vertex_type_id, @@ -245,25 +245,25 @@ impl LDBCParser { .meta_data .edge_map .get(edge_type.get()) - .ok_or(GDBError::InvalidTypeError(edge_type.get().to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(edge_type.get().to_string()))?; let src_id_index = edge_meta .get_column_index(START_ID_FIELD) - .ok_or(GDBError::ParseError)?; + .ok_or_else(|| GDBError::ParseError)?; let dst_id_index = edge_meta .get_column_index(END_ID_FIELD) - .ok_or(GDBError::ParseError)?; + .ok_or_else(|| GDBError::ParseError)?; let edge_type_id = *self .edge_type_to_id .get(edge_type.get()) - .ok_or(GDBError::InvalidTypeError(edge_type.get().to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(edge_type.get().to_string()))?; let src_vertex_type_id = *self .vertex_type_to_id .get(edge_type.get_src()) - .ok_or(GDBError::InvalidTypeError(edge_type.get_src().to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(edge_type.get_src().to_string()))?; let dst_vertex_type_id = *self .vertex_type_to_id .get(edge_type.get_dst()) - .ok_or(GDBError::InvalidTypeError(edge_type.get_dst().to_string()))?; + .ok_or_else(|| GDBError::InvalidTypeError(edge_type.get_dst().to_string()))?; Ok(LDBCEdgeParser { src_id_index, @@ -374,7 +374,7 @@ impl ParserTrait for LDBCVertex extra_label_id = *self .vertex_type_to_id .get(&record.to_uppercase()) - .ok_or(GDBError::FieldNotExistError)?; + .ok_or_else(|| GDBError::FieldNotExistError)?; // can break here because id always presents before label break; } @@ -751,9 +751,9 @@ impl GraphLoad fn get_fname_from_path(path: &PathBuf) -> GDBResult<&str> { let fname = path .file_name() - .ok_or(GDBError::UnknownError)? + .ok_or_else(|| GDBError::UnknownError)? .to_str() - .ok_or(GDBError::UnknownError)?; + .ok_or_else(|| GDBError::UnknownError)?; Ok(fname) }