Skip to content

Commit

Permalink
Fix cypher query's unexected result
Browse files Browse the repository at this point in the history
fixing CI
  • Loading branch information
zhanglei1949 committed Apr 23, 2024
1 parent aea1278 commit 2f841e9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 97 deletions.
35 changes: 24 additions & 11 deletions flex/engines/hqps_db/core/operator/edge_expand.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ class EdgeExpand {
const GeneralVertexSet<vertex_id_t, label_id_t, SET_T...>& cur_vertex_set,
Direction direction, label_id_t edge_label, label_id_t other_label,
const Filter<TruePredicate, SELECTOR...>& edge_filter,
size_t limit = INT_MAX) {
std::vector<label_id_t> valid_src_labels = {}, size_t limit = INT_MAX) {
VLOG(10) << "[EdgeExpandV] for general vertex set size: "
<< cur_vertex_set.Size();
auto state = EdgeExpandVState(graph, cur_vertex_set, direction, edge_label,
Expand All @@ -272,6 +272,12 @@ class EdgeExpand {
// std::tie(src_label, dst_label) =
// get_graph_label_pair(direction, cur_label, state.other_label_);

if (valid_src_labels.size() > 0 &&
std::find(valid_src_labels.begin(), valid_src_labels.end(),
src_label) == valid_src_labels.end()) {
continue;
}

VLOG(10) << "[EdgeExpandV]: "
<< "edge label: " << std::to_string(state.edge_label_)
<< "src: " << std::to_string(src_label)
Expand Down Expand Up @@ -304,6 +310,10 @@ class EdgeExpand {
}
VLOG(10) << "vids size: " << vids.size();
VLOG(10) << "offset: " << gs::to_string(offset);
LOG(INFO) << "got vertices of label: " << gs::to_string(state.other_label_)
<< ", vid : " << gs::to_string(vids)
<< "src label:" << gs::to_string(valid_src_labels)
<< ", edge label: " << gs::to_string(edge_label);
vertex_set_t result_set(std::move(vids), state.other_label_);
auto pair = std::make_pair(std::move(result_set), std::move(offset));
return pair;
Expand Down Expand Up @@ -459,24 +469,27 @@ class EdgeExpand {
CHECK(edge_triplets.size() > 0);
// result_set_t is the type of calling EdgeExpandV with cur_vertex_set
// and edge_triplets[i]
using result_set_t = decltype(
EdgeExpandV(graph, cur_vertex_set, direction, edge_triplets[0][2],
edge_triplets[0][1], std::move(edge_filter))
.first);
using result_set_t =
decltype(EdgeExpandV(graph, cur_vertex_set, direction,
edge_triplets[0][2], edge_triplets[0][1],
std::move(edge_filter))
.first);
using result_pair_t = std::pair<result_set_t, std::vector<offset_t>>;
std::vector<result_pair_t> result_pairs;
for (size_t i = 0; i < edge_triplets.size(); ++i) {
if (direction == Direction::In || direction == Direction::Both) {
auto copied_filter = edge_filter;
result_pairs.emplace_back(EdgeExpandV(
graph, cur_vertex_set, Direction::In, edge_triplets[i][2],
edge_triplets[i][0], std::move(copied_filter)));
result_pairs.emplace_back(
EdgeExpandV(graph, cur_vertex_set, Direction::In,
edge_triplets[i][2], edge_triplets[i][0],
std::move(copied_filter), {edge_triplets[i][1]}));
}
if (direction == Direction::Out || direction == Direction::Both) {
auto copied_filter = edge_filter;
result_pairs.emplace_back(EdgeExpandV(
graph, cur_vertex_set, Direction::Out, edge_triplets[i][2],
edge_triplets[i][1], std::move(copied_filter)));
result_pairs.emplace_back(
EdgeExpandV(graph, cur_vertex_set, Direction::Out,
edge_triplets[i][2], edge_triplets[i][1],
std::move(copied_filter), {edge_triplets[i][0]}));
}
}

Expand Down
81 changes: 0 additions & 81 deletions flex/engines/hqps_db/core/operator/path_expand.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,80 +450,6 @@ class PathExpand {
return std::make_pair(std::move(set), std::move(res_offsets));
}

template <typename LabelT, typename EDGE_FILTER_T, typename... SELECTOR>
static std::tuple<std::vector<vertex_id_t>, std::vector<Dist>,
std::vector<offset_t>>
PathExpandRawV2ForSingleV(
const GRAPH_INTERFACE& graph, LabelT src_label,
const std::vector<vertex_id_t>& src_vertices_vec, Range& range,
EdgeExpandOpt<LabelT, EDGE_FILTER_T, SELECTOR...>& edge_expand_opt) {
// auto src_label = vertex_set.GetLabel();
// auto src_vertices_vec = vertex_set.GetVertices();
vertex_id_t src_id = src_vertices_vec[0];

std::vector<vertex_id_t> gids;
std::vector<vertex_id_t> tmp_vec;
std::vector<offset_t> offsets;
// std::vector<std::vector<vertex_id_t>> gids;
// std::vector<std::vector<offset_t>> offsets;
std::unordered_set<vertex_id_t> visited_vertices;
std::vector<Dist> dists;

// init for index 0
tmp_vec.emplace_back(src_id);
visited_vertices.insert(src_id);
if (range.start_ == 0) {
gids.emplace_back(src_id);
dists.emplace_back(0);
}

label_id_t real_src_label, dst_label;
std::tie(real_src_label, dst_label) = get_graph_label_pair(
edge_expand_opt.dir_, src_label, edge_expand_opt.other_label_);

double visit_array_time = 0.0;
for (size_t cur_hop = 1; cur_hop < range.limit_; ++cur_hop) {
std::vector<size_t> unused;
std::tie(tmp_vec, unused) = graph.GetOtherVerticesV2(
real_src_label, dst_label, edge_expand_opt.edge_label_, tmp_vec,
gs::to_string(edge_expand_opt.dir_), INT_MAX);
// remove duplicate
size_t limit = 0;
for (size_t i = 0; i < tmp_vec.size(); ++i) {
if (visited_vertices.find(tmp_vec[i]) == visited_vertices.end()) {
tmp_vec[limit++] = tmp_vec[i];
}
}
tmp_vec.resize(limit);
if (cur_hop >= range.start_) {
// emplace tmp_vec to gids;
for (size_t i = 0; i < tmp_vec.size(); ++i) {
auto nbr_gid = tmp_vec[i];
auto insert_res = visited_vertices.insert(nbr_gid);
if (insert_res.second) {
gids.emplace_back(nbr_gid);
dists.emplace_back(cur_hop);
}
}
} else {
// when cur_hop is not included, we also need to insert vertices into
// set, to avoid duplicated.
for (size_t i = 0; i < tmp_vec.size(); ++i) {
auto nbr_gid = tmp_vec[i];
visited_vertices.insert(nbr_gid);
}
}
}
LOG(INFO) << "visit array time: " << visit_array_time
<< ", gid size: " << gids.size();
// select vertices that are in range.
offsets.emplace_back(0);
offsets.emplace_back(gids.size());

return std::make_tuple(std::move(gids), std::move(dists),
std::move(offsets));
}

// TODO: dedup can be used to speed up the query when the input vertices
// size if 1.
// const VERTEX_SET_T& vertex_set,
Expand All @@ -537,13 +463,6 @@ class PathExpand {
// auto src_label = vertex_set.GetLabel();
// auto src_vertices_vec = vertex_set.GetVertices();
auto src_vertices_size = src_vertices_vec.size();
if (src_vertices_size == 1) {
LOG(INFO)
<< "[NOTE:] PathExpandRawVMultiV is used for single vertex expand, "
"dedup is enabled.";
return PathExpandRawV2ForSingleV(graph, src_label, src_vertices_vec,
range, edge_expand_opt);
}
std::vector<std::vector<vertex_id_t>> gids;
std::vector<std::vector<offset_t>> offsets;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,9 @@ public static QueryContext get_simple_match_query_13_test() {
Arrays.asList(
"Record<{aId: 0, bId: 3}>",
"Record<{aId: 0, bId: 59}>",
"Record<{aId: 0, bId: 349}>",
"Record<{aId: 0, bId: 933}>",
"Record<{aId: 0, bId: 1454}>");
"Record<{aId: 0, bId: 59}>",
"Record<{aId: 0, bId: 291}>",
"Record<{aId: 0, bId: 349}>");
return new QueryContext(query, expected);
}

Expand Down Expand Up @@ -211,8 +211,8 @@ public static QueryContext get_simple_match_query_15_test() {
}

public static QueryContext get_simple_match_query_16_test() {
String query = "Match (a)-[c*0..2]->(b) RETURN COUNT (DISTINCT c);";
List<String> expected = Arrays.asList("Record<{$f0: 1552803}>");
String query = "Match (a:TAG)<-[c*1..2]-(b) RETURN COUNT(c);";
List<String> expected = Arrays.asList("Record<{$f0: 325593}>");
return new QueryContext(query, expected);
}
}

0 comments on commit 2f841e9

Please sign in to comment.