Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(interactive): fix the bug in filtering after intersection in GIE Runtime in distributed computation #3359

Merged
merged 19 commits into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions flex/engines/hqps_db/database/adj_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ class SinglePropGetter {
using value_type = T;
static constexpr size_t prop_num = 1;
SinglePropGetter() {}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {
CHECK(column.get() != nullptr);
}
SinglePropGetter(std::shared_ptr<TypedRefColumn<T>> c) : column(c) {}

inline value_type get_view(vid_t vid) const {
if (vid == NONE) {
if (vid == NONE || column == nullptr) {
return NullRecordCreator<value_type>::GetNull();
}
return column->get_view(vid);
Expand Down Expand Up @@ -149,15 +147,25 @@ class MultiPropGetter {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
}
return get_view(vid, std::make_index_sequence<sizeof...(T)>());
result_tuple_t ret;
fill_result_tuple(ret, vid);
return ret;
}

template <size_t... Is>
inline result_tuple_t get_view(vid_t vid, std::index_sequence<Is...>) const {
if (vid == NONE) {
return NullRecordCreator<result_tuple_t>::GetNull();
template <size_t I = 0>
inline typename std::enable_if<I == sizeof...(T), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {}

template <size_t I = 0>
inline typename std::enable_if<(I < sizeof...(T)), void>::type
fill_result_tuple(result_tuple_t& ret, vid_t vid) const {
using cur_ele_t = typename std::tuple_element<I, result_tuple_t>::type;
if (std::get<I>(column) == nullptr) {
std::get<I>(ret) = NullRecordCreator<cur_ele_t>::GetNull();
} else {
std::get<I>(ret) = std::get<I>(column)->get_view(vid);
}
return std::make_tuple(std::get<Is>(column)->get_view(vid)...);
fill_result_tuple<I + 1>(ret, vid);
}

inline MultiPropGetter<T...>& operator=(const MultiPropGetter<T...>& d) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public abstract class PatternQueryTest extends AbstractGremlinProcessTest {

public abstract Traversal<Vertex, Long> get_pattern_16_test();

public abstract Traversal<Vertex, Long> get_pattern_17_test();

@Test
public void run_pattern_1_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_1_test();
Expand Down Expand Up @@ -170,6 +172,13 @@ public void run_pattern_16_test() {
Assert.assertEquals(23286L, traversal.next().longValue());
}

@Test
public void run_pattern_17_test() {
Traversal<Vertex, Long> traversal = this.get_pattern_17_test();
this.printTraversalForm(traversal);
Assert.assertEquals(17367L, traversal.next().longValue());
}

public static class Traversals extends PatternQueryTest {

// PM1
Expand Down Expand Up @@ -356,5 +365,26 @@ public Traversal<Vertex, Long> get_pattern_16_test() {
.by("firstName")
.count();
}

@Override
public Traversal<Vertex, Long> get_pattern_17_test() {
return g.V().match(
__.as("a")
.hasLabel("PERSON")
.out("HASINTEREST")
.hasLabel("TAG")
.as("b"),
__.as("a")
.hasLabel("PERSON")
.in("HASCREATOR")
.hasLabel("COMMENT", "POST")
.as("c"),
__.as("c")
.hasLabel("COMMENT", "POST")
.out("HASTAG")
.hasLabel("TAG")
.as("b"))
.count();
}
}
}
25 changes: 18 additions & 7 deletions interactive_engine/executor/ir/common/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,24 @@ impl From<(pb::EdgeExpand, pb::GetV)> for pb::path_expand::ExpandBase {
}

impl pb::QueryParams {
// is_queryable doesn't consider tables as we assume that the table info can be inferred directly from current data.
pub fn is_queryable(&self) -> bool {
!(self.predicate.is_none()
&& self.limit.is_none()
&& self.sample_ratio == 1.0
&& self.columns.is_empty()
&& !self.is_all_columns)
pub fn has_labels(&self) -> bool {
!self.tables.is_empty()
}

pub fn has_columns(&self) -> bool {
!self.columns.is_empty() || self.is_all_columns
}

pub fn has_predicates(&self) -> bool {
self.predicate.is_some()
}

pub fn has_sample(&self) -> bool {
self.sample_ratio != 1.0
}

pub fn has_limit(&self) -> bool {
self.limit.is_some()
}

pub fn is_empty(&self) -> bool {
Expand Down
8 changes: 7 additions & 1 deletion interactive_engine/executor/ir/core/src/plan/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1477,7 +1477,13 @@ fn is_whole_graph(operator: &pb::logical_plan::Operator) -> bool {
&& scan
.params
.as_ref()
.map(|params| !params.is_queryable() && is_params_all_labels(params))
.map(|params| {
!(params.has_columns()
|| params.has_predicates()
|| params.has_sample()
|| params.has_limit())
&& is_params_all_labels(params)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can is_params_all_labels be replaced with params.has_labels()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not. Here we have to confirm it is the "whole graph", that it either specifies all labels, or no labels (also indicates that all labels is allowed).

})
.unwrap_or(true)
}
pb::logical_plan::operator::Opr::Root(_) => true,
Expand Down
16 changes: 11 additions & 5 deletions interactive_engine/executor/ir/core/src/plan/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,10 @@ impl AsPhysical for pb::PathExpand {
let getv = getv.unwrap();
if edge_expand.expand_opt == pb::edge_expand::ExpandOpt::Edge as i32 {
let has_getv_filter = if let Some(params) = getv.params.as_ref() {
params.is_queryable() || !params.tables.is_empty()
// In RBO, sample_ratio and limit would be fused into EdgeExpand(Opt=Edge), rather than GetV,
// thus we do not consider them here.
// TODO: Notice that we consider table here since we cannot specify vertex labels in ExpandV
params.has_predicates() || params.has_columns() || params.has_labels()
} else {
false
};
Expand Down Expand Up @@ -448,9 +451,9 @@ fn build_and_try_fuse_get_v(builder: &mut PlanBuilder, mut get_v: pb::GetV) -> I
return Err(IrError::Unsupported("Try to fuse GetV with Opt=Self into ExpandE".to_string()));
}
if let Some(params) = get_v.params.as_mut() {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
return Err(IrError::Unsupported("Try to fuse GetV with predicates into ExpandE".to_string()));
} else if !params.tables.is_empty() {
} else if params.has_labels() {
// although this doesn't need query, it cannot be fused into ExpandExpand since we cannot specify vertex labels in ExpandV
builder.get_v(get_v);
return Ok(());
Expand Down Expand Up @@ -502,7 +505,7 @@ impl AsPhysical for pb::GetV {
let mut getv = self.clone();
// If GetV(Adj) with filter, translate GetV into GetV(GetAdj) + Shuffle (if on distributed storage) + GetV(Self)
if let Some(params) = getv.params.as_mut() {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
let auxilia = pb::GetV {
tag: None,
opt: 4, //ItSelf
Expand Down Expand Up @@ -1069,7 +1072,7 @@ fn add_intersect_job_builder(
// vertex parameter after the intersection
if let Some(params) = get_v.params.as_ref() {
// the case that we need to further process getV's filter.
if params.is_queryable() || !params.tables.is_empty() {
if params.has_predicates() || params.has_columns() || params.has_labels() {
get_v.opt = 4;
auxilia = Some(get_v.clone());
}
Expand All @@ -1089,6 +1092,9 @@ fn add_intersect_job_builder(
// add vertex filters
if let Some(mut auxilia) = auxilia {
auxilia.tag = Some(intersect_tag.clone());
if plan_meta.is_partition() {
builder.shuffle(Some(intersect_tag.clone()));
}
builder.get_v(auxilia);
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions interactive_engine/executor/ir/runtime/src/assembly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -627,8 +627,8 @@ impl<P: PartitionInfo, C: ClusterInfo> IRJobAssembly<P, C> {
OpKind::Scan(scan) => {
let udf_gen = self.udf_gen.clone();
stream = stream.flat_map(move |_| {
let scan_iter = udf_gen.gen_source(scan.clone().into()).unwrap();
Ok(scan_iter)
let scan_iter = udf_gen.gen_source(scan.clone().into());
Ok(scan_iter?)
})?;
}
OpKind::Sample(sample) => {
Expand Down
7 changes: 7 additions & 0 deletions interactive_engine/executor/ir/runtime/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ impl From<GraphProxyError> for FnGenError {
}
}

impl From<FnGenError> for DynError {
fn from(e: FnGenError) -> Self {
let err: Box<dyn std::error::Error + Send + Sync> = e.into();
err
}
}

impl From<FnGenError> for BuildJobError {
fn from(e: FnGenError) -> Self {
match e {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,22 @@ impl FilterMapFunction<Record, Record> for AuxiliaOperator {
// e.g., for g.V().out().as("a").has("name", "marko"), we should compile as:
// g.V().out().auxilia(as("a"))... where we give alias in auxilia,
// then we set tag=None and alias="a" in auxilia
// 1. filter by labels.

// 1. If to filter by labels, and the entry itself carries label information already, directly eval it without query the store
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the following code be replaced with params.ha_labels()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if !self.query_params.labels.is_empty() && entry.label().is_some() {
if !self
.query_params
.labels
.contains(&entry.label().unwrap())
{
// pruning by labels
return Ok(None);
} else if self.query_params.filter.is_none() && self.query_params.columns.is_none() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use has_predicates & has_columns?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// if only filter by labels, directly return the results.
return Ok(Some(input));
}
}
// 2. further fetch properties, e.g., filter by columns.
// 2. Otherwise, filter after query store, e.g., the case of filter by columns.
match entry.get_type() {
EntryType::Vertex => {
let graph = get_graph().ok_or_else(|| FnExecError::NullGraphError)?;
Expand Down Expand Up @@ -248,7 +253,7 @@ impl FilterMapFuncGen for pb::GetV {
VOpt::Start | VOpt::End | VOpt::Other => {
let mut tables_condition: Vec<LabelId> = vec![];
if let Some(params) = self.params {
if params.is_queryable() {
if params.has_predicates() || params.has_columns() {
Err(FnGenError::unsupported_error(&format!("QueryParams in GetV {:?}", params)))?
} else {
tables_condition = params
Expand Down
Loading