From 8b60b233cbe7e7868165050af7d50cc52d396e88 Mon Sep 17 00:00:00 2001 From: "bingqing.lbq" Date: Sun, 16 Jul 2023 09:43:32 +0000 Subject: [PATCH 1/3] [BugFix] fix bug when process pk scan of multiple labels Committed-by: bingqing.lbq from Dev container --- .../executor/ir/core/src/plan/logical.rs | 66 +++++++++++++++++-- .../ir/runtime/src/process/operator/source.rs | 24 +++---- 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index bb0d764824d7..dd1eec5821e6 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -693,8 +693,9 @@ fn check_primary_key_from_pb( } /// To optimize a triplet predicate of into an `IndexPredicate`. +/// Notice that multiple tables allowed. If the tables have the same pk, it can be optimized into one `IndexPredicate`. fn triplet_to_index_predicate( - operators: &[common_pb::ExprOpr], table: &common_pb::NameOrId, is_vertex: bool, meta: &StoreMeta, + operators: &[common_pb::ExprOpr], tables: &Vec, is_vertex: bool, meta: &StoreMeta, ) -> IrResult> { if operators.len() != 3 { return Ok(None); @@ -713,9 +714,16 @@ fn triplet_to_index_predicate( if let Some(item) = &property.item { match item { common_pb::property::Item::Key(col) => { - let (is_pk, num_pks) = - check_primary_key_from_pb(schema, table, col, is_vertex); - if is_pk && num_pks == 1 { + let mut is_pk = true; + for table in tables { + let (is_pk_, num_pks) = + check_primary_key_from_pb(schema, table, col, is_vertex); + if !is_pk_ || num_pks != 1 { + is_pk = false; + break; + } + } + if is_pk { key = Some(property.clone()); } } @@ -1079,12 +1087,12 @@ impl AsLogical for pb::Scan { } if let Some(params) = self.params.as_mut() { if self.idx_predicate.is_none() { - if let Some(table) = params.tables.get(0) { + if !params.tables.is_empty() { let mut idx_pred = None; if let Some(expr) = ¶ms.predicate { idx_pred = triplet_to_index_predicate( expr.operators.as_slice(), - table, + ¶ms.tables, self.scan_opt != 1, meta, )?; @@ -1920,6 +1928,52 @@ mod test { ); } + // e.g., g.V().hasLabel("person", "software").has("name", "John") + #[test] + fn scan_multi_labels_pred_to_idx_pred() { + let mut plan_meta = PlanMeta::default(); + plan_meta.set_curr_node(0); + plan_meta.curr_node_meta_mut(); + plan_meta.refer_to_nodes(0, vec![0]); + let meta = StoreMeta { + schema: Some( + Schema::from_json(std::fs::File::open("resource/modern_schema_pk.json").unwrap()).unwrap(), + ), + }; + let mut scan = pb::Scan { + scan_opt: 0, + alias: None, + params: Some(pb::QueryParams { + tables: vec!["person".into(), "software".into()], + columns: vec![], + is_all_columns: false, + limit: None, + predicate: Some(str_to_expr_pb("@.name == \"John\"".to_string()).unwrap()), + sample_ratio: 1.0, + extra: HashMap::new(), + }), + idx_predicate: None, + meta_data: None, + }; + + scan.preprocess(&meta, &mut plan_meta).unwrap(); + assert!(scan.params.unwrap().predicate.is_none()); + assert_eq!( + scan.idx_predicate.unwrap(), + pb::IndexPredicate { + or_predicates: vec![pb::index_predicate::AndPredicate { + predicates: vec![pb::index_predicate::Triplet { + key: Some(common_pb::Property { + item: Some(common_pb::property::Item::Key("name".into())), + }), + value: Some("John".to_string().into()), + cmp: None, + }] + }] + } + ); + } + #[test] fn column_maintain_case1() { let mut plan = LogicalPlan::default(); diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 0603836e25e8..1a9dcc7b312e 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -135,19 +135,21 @@ impl SourceOperator { } } } else if let Some(ref indexed_values) = self.primary_key_values { - if self.query_params.labels.len() != 1 { - Err(FnGenError::unsupported_error(&format!( - "Empty/Multiple labels in `IndexScan`, labels are {:?}", - self.query_params.labels - )))? + if self.query_params.labels.len() < 1 { + Err(FnGenError::unsupported_error( + "Empty label in `IndexScan` + self.query_params.labels", + ))? } - if let Some(v) = graph.index_scan_vertex( - self.query_params.labels[0], - indexed_values, - &self.query_params, - )? { - v_source = Box::new(vec![v].into_iter()) + let mut source_vertices = vec![]; + for label in &self.query_params.labels { + if let Some(v) = + graph.index_scan_vertex(*label, indexed_values, &self.query_params)? + { + source_vertices.push(v); + } } + v_source = Box::new(source_vertices.into_iter()); } else { // parallel scan, and each worker should scan the partitions assigned to it in self.v_params.partitions v_source = graph.scan_vertex(&self.query_params)?; From b537894de4c591243dc140c8090b912270b9e8a1 Mon Sep 17 00:00:00 2001 From: "bingqing.lbq" Date: Mon, 17 Jul 2023 01:30:43 +0000 Subject: [PATCH 2/3] minor refine Committed-by: bingqing.lbq from Dev container --- .../executor/ir/core/src/plan/logical.rs | 20 +++++++++---------- .../ir/runtime/src/process/operator/source.rs | 5 ++--- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/interactive_engine/executor/ir/core/src/plan/logical.rs b/interactive_engine/executor/ir/core/src/plan/logical.rs index dd1eec5821e6..5b635d967078 100644 --- a/interactive_engine/executor/ir/core/src/plan/logical.rs +++ b/interactive_engine/executor/ir/core/src/plan/logical.rs @@ -703,6 +703,9 @@ fn triplet_to_index_predicate( if meta.schema.is_none() { return Ok(None); } + if tables.is_empty() { + return Ok(None); + } let schema = meta.schema.as_ref().unwrap(); let mut key = None; let mut is_eq = false; @@ -1087,16 +1090,13 @@ impl AsLogical for pb::Scan { } if let Some(params) = self.params.as_mut() { if self.idx_predicate.is_none() { - if !params.tables.is_empty() { - let mut idx_pred = None; - if let Some(expr) = ¶ms.predicate { - idx_pred = triplet_to_index_predicate( - expr.operators.as_slice(), - ¶ms.tables, - self.scan_opt != 1, - meta, - )?; - } + if let Some(expr) = ¶ms.predicate { + let idx_pred = triplet_to_index_predicate( + expr.operators.as_slice(), + ¶ms.tables, + self.scan_opt != 1, + meta, + )?; if idx_pred.is_some() { params.predicate = None; diff --git a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs index 1a9dcc7b312e..67c8730556a9 100644 --- a/interactive_engine/executor/ir/runtime/src/process/operator/source.rs +++ b/interactive_engine/executor/ir/runtime/src/process/operator/source.rs @@ -135,10 +135,9 @@ impl SourceOperator { } } } else if let Some(ref indexed_values) = self.primary_key_values { - if self.query_params.labels.len() < 1 { + if self.query_params.labels.is_empty() { Err(FnGenError::unsupported_error( - "Empty label in `IndexScan` - self.query_params.labels", + "Empty label in `IndexScan` self.query_params.labels", ))? } let mut source_vertices = vec![]; From a69cf3964d1375dc71ee1dcf425bbe282b93834b Mon Sep 17 00:00:00 2001 From: "bingqing.lbq" Date: Mon, 17 Jul 2023 04:30:32 +0000 Subject: [PATCH 3/3] [CI Tests] add pkscan ci tests in test_min Committed-by: bingqing.lbq from Dev container --- python/graphscope/tests/minitest/test_min.py | 22 ++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/python/graphscope/tests/minitest/test_min.py b/python/graphscope/tests/minitest/test_min.py index 6a759b7e5a90..5201a3b6913e 100644 --- a/python/graphscope/tests/minitest/test_min.py +++ b/python/graphscope/tests/minitest/test_min.py @@ -310,6 +310,27 @@ def subgraph_roundtrip(num_workers, threads_per_worker): assert make_edges_set(edges) == make_edges_set(subgraph_edges) session.close() + def pkscan_roundtrip(num_workers, threads_per_worker): + logger.info( + "testing pkscan with %d workers and %d threads per worker", + num_workers, + threads_per_worker, + ) + + query1 = "g.V().hasLabel('person').has('id', 1)" + query2 = "g.V().hasLabel('person','software').has('id', 1)" + session = graphscope.session(cluster_type="hosts", num_workers=num_workers) + + g0 = load_modern_graph(session) + interactive0 = session.gremlin(g0) + query1_res = interactive0.execute(query1).all().result() + query2_res = interactive0.execute(query2).all().result() + logger.info("query1_res = %s", query1_res) + logger.info("query2_res = %s", query2_res) + assert len(query1_res) == 1 + assert len(query2_res) == 1 + session.close() + with vineyard.envvars( { "RUST_LOG": "debug", @@ -318,3 +339,4 @@ def subgraph_roundtrip(num_workers, threads_per_worker): } ): subgraph_roundtrip(num_workers, threads_per_worker) + pkscan_roundtrip(num_workers, threads_per_worker)