diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index 03e18272cc7b..cf95ce15823b 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -146,7 +146,12 @@ void build_fused_edge_get_v( CHECK(vertex_labels.size() > 0); edge_expand_op.set_expand_opt( physical::EdgeExpand::ExpandOpt::EdgeExpand_ExpandOpt_VERTEX); - edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value()); + if (get_v_op.has_alias()) { + edge_expand_op.mutable_alias()->set_value(get_v_op.alias().value()); + } else { + edge_expand_op.mutable_alias()->set_value(-1); + } + ss << _4_SPACES << BuildEdgeExpandOp(ctx, edge_expand_op, edge_meta_data, vertex_labels) diff --git a/flex/engines/hqps_db/database/adj_list.h b/flex/engines/hqps_db/database/adj_list.h index 0e8c6fa0a194..0c42432c69fe 100644 --- a/flex/engines/hqps_db/database/adj_list.h +++ b/flex/engines/hqps_db/database/adj_list.h @@ -114,12 +114,10 @@ class SinglePropGetter { using value_type = T; static constexpr size_t prop_num = 1; SinglePropGetter() {} - SinglePropGetter(std::shared_ptr> c) : column(c) { - CHECK(column.get() != nullptr); - } + SinglePropGetter(std::shared_ptr> c) : column(c) {} inline value_type get_view(vid_t vid) const { - if (vid == NONE) { + if (vid == NONE || column == nullptr) { return NullRecordCreator::GetNull(); } return column->get_view(vid); @@ -149,15 +147,25 @@ class MultiPropGetter { if (vid == NONE) { return NullRecordCreator::GetNull(); } - return get_view(vid, std::make_index_sequence()); + result_tuple_t ret; + fill_result_tuple(ret, vid); + return ret; } - template - inline result_tuple_t get_view(vid_t vid, std::index_sequence) const { - if (vid == NONE) { - return NullRecordCreator::GetNull(); + template + inline typename std::enable_if::type + fill_result_tuple(result_tuple_t& ret, vid_t vid) const {} + + template + inline typename std::enable_if<(I < sizeof...(T)), void>::type + fill_result_tuple(result_tuple_t& ret, vid_t vid) const { + using cur_ele_t = typename std::tuple_element::type; + if (std::get(column) == nullptr) { + std::get(ret) = NullRecordCreator::GetNull(); + } else { + std::get(ret) = std::get(column)->get_view(vid); } - return std::make_tuple(std::get(column)->get_view(vid)...); + fill_result_tuple(ret, vid); } inline MultiPropGetter& operator=(const MultiPropGetter& d) { diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java index d6578865b045..6f24cc46a240 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/integration/suite/pattern/PatternQueryTest.java @@ -58,6 +58,8 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest { public abstract Traversal get_pattern_16_test(); + public abstract Traversal get_pattern_17_test(); + @Test public void run_pattern_1_test() { Traversal traversal = this.get_pattern_1_test(); @@ -170,6 +172,13 @@ public void run_pattern_16_test() { Assert.assertEquals(23286L, traversal.next().longValue()); } + @Test + public void run_pattern_17_test() { + Traversal traversal = this.get_pattern_17_test(); + this.printTraversalForm(traversal); + Assert.assertEquals(17367L, traversal.next().longValue()); + } + public static class Traversals extends PatternQueryTest { // PM1 @@ -356,5 +365,26 @@ public Traversal get_pattern_16_test() { .by("firstName") .count(); } + + @Override + public Traversal get_pattern_17_test() { + return g.V().match( + __.as("a") + .hasLabel("PERSON") + .out("HASINTEREST") + .hasLabel("TAG") + .as("b"), + __.as("a") + .hasLabel("PERSON") + .in("HASCREATOR") + .hasLabel("COMMENT", "POST") + .as("c"), + __.as("c") + .hasLabel("COMMENT", "POST") + .out("HASTAG") + .hasLabel("TAG") + .as("b")) + .count(); + } } } diff --git a/interactive_engine/executor/ir/common/src/utils.rs b/interactive_engine/executor/ir/common/src/utils.rs index 4401478fe8c0..824b2bd3f524 100644 --- a/interactive_engine/executor/ir/common/src/utils.rs +++ b/interactive_engine/executor/ir/common/src/utils.rs @@ -687,13 +687,24 @@ impl From<(pb::EdgeExpand, pb::GetV)> for pb::path_expand::ExpandBase { } impl pb::QueryParams { - // is_queryable doesn't consider tables as we assume that the table info can be inferred directly from current data. - pub fn is_queryable(&self) -> bool { - !(self.predicate.is_none() - && self.limit.is_none() - && self.sample_ratio == 1.0 - && self.columns.is_empty() - && !self.is_all_columns) + pub fn has_labels(&self) -> bool { + !self.tables.is_empty() + } + + pub fn has_columns(&self) -> bool { + !self.columns.is_empty() || self.is_all_columns + } + + pub fn has_predicates(&self) -> bool { + self.predicate.is_some() + } + + pub fn has_sample(&self) -> bool { + self.sample_ratio != 1.0 + } + + pub fn has_limit(&self) -> bool { + self.limit.is_some() } pub fn is_empty(&self) -> bool { diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index d5b061bfb799..b5f81406bc60 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -1477,7 +1477,13 @@ fn is_whole_graph(operator: &pb::logical_plan::Operator) -> bool { && scan .params .as_ref() - .map(|params| !params.is_queryable() && is_params_all_labels(params)) + .map(|params| { + !(params.has_columns() + || params.has_predicates() + || params.has_sample() + || params.has_limit()) + && is_params_all_labels(params) + }) .unwrap_or(true) } pb::logical_plan::operator::Opr::Root(_) => true, diff --git a/interactive_engine/executor/ir/core/src/plan/physical.rs b/interactive_engine/executor/ir/core/src/plan/physical.rs index f55e451114af..8363740c9706 100644 --- a/interactive_engine/executor/ir/core/src/plan/physical.rs +++ b/interactive_engine/executor/ir/core/src/plan/physical.rs @@ -262,7 +262,10 @@ impl AsPhysical for pb::PathExpand { let getv = getv.unwrap(); if edge_expand.expand_opt == pb::edge_expand::ExpandOpt::Edge as i32 { let has_getv_filter = if let Some(params) = getv.params.as_ref() { - params.is_queryable() || !params.tables.is_empty() + // In RBO, sample_ratio and limit would be fused into EdgeExpand(Opt=Edge), rather than GetV, + // thus we do not consider them here. + // TODO: Notice that we consider table here since we cannot specify vertex labels in ExpandV + params.has_predicates() || params.has_columns() || params.has_labels() } else { false }; @@ -448,9 +451,9 @@ fn build_and_try_fuse_get_v(builder: &mut PlanBuilder, mut get_v: pb::GetV) -> I return Err(IrError::Unsupported("Try to fuse GetV with Opt=Self into ExpandE".to_string())); } if let Some(params) = get_v.params.as_mut() { - if params.is_queryable() { + if params.has_predicates() || params.has_columns() { return Err(IrError::Unsupported("Try to fuse GetV with predicates into ExpandE".to_string())); - } else if !params.tables.is_empty() { + } else if params.has_labels() { // although this doesn't need query, it cannot be fused into ExpandExpand since we cannot specify vertex labels in ExpandV builder.get_v(get_v); return Ok(()); @@ -502,7 +505,7 @@ impl AsPhysical for pb::GetV { let mut getv = self.clone(); // If GetV(Adj) with filter, translate GetV into GetV(GetAdj) + Shuffle (if on distributed storage) + GetV(Self) if let Some(params) = getv.params.as_mut() { - if params.is_queryable() { + if params.has_predicates() || params.has_columns() { let auxilia = pb::GetV { tag: None, opt: 4, //ItSelf @@ -1069,7 +1072,7 @@ fn add_intersect_job_builder( // vertex parameter after the intersection if let Some(params) = get_v.params.as_ref() { // the case that we need to further process getV's filter. - if params.is_queryable() || !params.tables.is_empty() { + if params.has_predicates() || params.has_columns() || params.has_labels() { get_v.opt = 4; auxilia = Some(get_v.clone()); } @@ -1089,6 +1092,9 @@ fn add_intersect_job_builder( // add vertex filters if let Some(mut auxilia) = auxilia { auxilia.tag = Some(intersect_tag.clone()); + if plan_meta.is_partition() { + builder.shuffle(Some(intersect_tag.clone())); + } builder.get_v(auxilia); } Ok(()) diff --git a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs index cc5e1040033a..b38b0e8538be 100644 --- a/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs +++ b/interactive_engine/executor/ir/graph_proxy/src/apis/graph/mod.rs @@ -174,4 +174,16 @@ impl QueryParams { None } } + + pub fn has_labels(&self) -> bool { + !self.labels.is_empty() + } + + pub fn has_predicates(&self) -> bool { + self.filter.is_some() + } + + pub fn has_columns(&self) -> bool { + self.columns.is_some() + } } diff --git a/interactive_engine/executor/ir/runtime/src/assembly.rs b/interactive_engine/executor/ir/runtime/src/assembly.rs index 5090646ed877..e66721d767e8 100644 --- a/interactive_engine/executor/ir/runtime/src/assembly.rs +++ b/interactive_engine/executor/ir/runtime/src/assembly.rs @@ -627,8 +627,8 @@ impl IRJobAssembly { OpKind::Scan(scan) => { let udf_gen = self.udf_gen.clone(); stream = stream.flat_map(move |_| { - let scan_iter = udf_gen.gen_source(scan.clone().into()).unwrap(); - Ok(scan_iter) + let scan_iter = udf_gen.gen_source(scan.clone().into()); + Ok(scan_iter?) })?; } OpKind::Sample(sample) => { diff --git a/interactive_engine/executor/ir/runtime/src/error.rs b/interactive_engine/executor/ir/runtime/src/error.rs index 169a52456b8c..3714e4371fab 100644 --- a/interactive_engine/executor/ir/runtime/src/error.rs +++ b/interactive_engine/executor/ir/runtime/src/error.rs @@ -75,6 +75,13 @@ impl From for FnGenError { } } +impl From for DynError { + fn from(e: FnGenError) -> Self { + let err: Box = e.into(); + err + } +} + impl From for BuildJobError { fn from(e: FnGenError) -> Self { match e { diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs index 3666aebd5711..e8cbf40f03b3 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/map/get_v.rs @@ -159,17 +159,22 @@ impl FilterMapFunction for AuxiliaOperator { // e.g., for g.V().out().as("a").has("name", "marko"), we should compile as: // g.V().out().auxilia(as("a"))... where we give alias in auxilia, // then we set tag=None and alias="a" in auxilia - // 1. filter by labels. - if !self.query_params.labels.is_empty() && entry.label().is_some() { + + // 1. If to filter by labels, and the entry itself carries label information already, directly eval it without query the store + if self.query_params.has_labels() && entry.label().is_some() { if !self .query_params .labels .contains(&entry.label().unwrap()) { + // pruning by labels return Ok(None); + } else if !self.query_params.has_predicates() && !self.query_params.has_columns() { + // if only filter by labels, directly return the results. + return Ok(Some(input)); } } - // 2. further fetch properties, e.g., filter by columns. + // 2. Otherwise, filter after query store, e.g., the case of filter by columns. match entry.get_type() { EntryType::Vertex => { let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?; @@ -248,7 +253,7 @@ impl FilterMapFuncGen for pb::GetV { VOpt::Start | VOpt::End | VOpt::Other => { let mut tables_condition: Vec = vec![]; if let Some(params) = self.params { - if params.is_queryable() { + if params.has_predicates() || params.has_columns() { Err(FnGenError::unsupported_error(&format!("QueryParams in GetV {:?}", params)))? } else { tables_condition = params diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index d54657032936..3e3333e32cc7 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -149,7 +149,7 @@ impl SourceOperator { )); } } else if let Some(pkvs) = &self.primary_key_values { - if self.query_params.labels.is_empty() { + if !self.query_params.has_labels() { Err(FnGenError::unsupported_error( "Empty label in `IndexScan` self.query_params.labels", ))?