Skip to content

Commit

Permalink
Add degree threshold to katz_centrality, clustering and avg_clusterin…
Browse files Browse the repository at this point in the history
…g algorithm (#1507)


* Upgrade vineyard version to 0.6.0

Co-authored-by: lidongze0629 <[email protected]>
  • Loading branch information
siyuan0322 and lidongze0629 authored Jul 7, 2022
1 parent d1e5076 commit 889e003
Show file tree
Hide file tree
Showing 22 changed files with 124 additions and 62 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/gae.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
runs-on: ubuntu-20.04
if: ${{ github.repository == 'alibaba/GraphScope' }}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.5.3
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.6.0
options:
--shm-size 4096m
steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/networkx-forward-algo-nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
run:
shell: bash --noprofile --norc -eo pipefail {0}
container:
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.5.3
image: registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.6.0
options:
--shm-size 4096m

Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ endif ()
find_package(libgrapelite REQUIRED)
include_directories(${LIBGRAPELITE_INCLUDE_DIRS})

find_package(vineyard 0.5.3 REQUIRED)
find_package(vineyard 0.6.0 REQUIRED)
include_directories(${VINEYARD_INCLUDE_DIRS})
add_compile_options(-DENABLE_SELECTOR)

Expand Down
71 changes: 43 additions & 28 deletions analytical_engine/apps/centrality/katz/katz_centrality.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,39 +87,43 @@ class KatzCentrality
auto inner_vertices = frag.InnerVertices();
if (frag.directed()) {
ForEach(inner_vertices.begin(), inner_vertices.end(),
[&ctx, &frag, &messages](int tid, vertex_t v) {
auto es = frag.GetIncomingAdjList(v);
ctx.x[v] = 0;
for (auto& e : es) {
// do the multiplication y^T = Alpha * x^T A - Beta
double edata = 1.0;
static_if<!std::is_same<edata_t, grape::EmptyType>{}>(
[&](auto& e, auto& data) {
data = static_cast<double>(e.get_data());
})(e, edata);
ctx.x[v] += ctx.x_last[e.get_neighbor()] * edata;
[this, &ctx, &frag, &messages](int tid, vertex_t v) {
if (!filterByDegree(frag, ctx, v)) {
auto es = frag.GetIncomingAdjList(v);
ctx.x[v] = 0;
for (auto& e : es) {
// do the multiplication y^T = Alpha * x^T A - Beta
double edata = 1.0;
static_if<!std::is_same<edata_t, grape::EmptyType>{}>(
[&](auto& e, auto& data) {
data = static_cast<double>(e.get_data());
})(e, edata);
ctx.x[v] += ctx.x_last[e.get_neighbor()] * edata;
}
ctx.x[v] = ctx.x[v] * ctx.alpha + ctx.beta;
messages.Channels()[tid].SendMsgThroughOEdges(frag, v,
ctx.x[v]);
}
ctx.x[v] = ctx.x[v] * ctx.alpha + ctx.beta;
messages.Channels()[tid].SendMsgThroughOEdges(frag, v,
ctx.x[v]);
});
} else {
ForEach(inner_vertices.begin(), inner_vertices.end(),
[&ctx, &frag, &messages](int tid, vertex_t v) {
auto es = frag.GetOutgoingAdjList(v);
ctx.x[v] = 0;
for (auto& e : es) {
// do the multiplication y^T = Alpha * x^T A - Beta
double edata = 1.0;
static_if<!std::is_same<edata_t, grape::EmptyType>{}>(
[&](auto& e, auto& data) {
data = static_cast<double>(e.get_data());
})(e, edata);
ctx.x[v] += ctx.x_last[e.get_neighbor()] * edata;
[this, &ctx, &frag, &messages](int tid, vertex_t v) {
if (!filterByDegree(frag, ctx, v)) {
auto es = frag.GetOutgoingAdjList(v);
ctx.x[v] = 0;
for (auto& e : es) {
// do the multiplication y^T = Alpha * x^T A - Beta
double edata = 1.0;
static_if<!std::is_same<edata_t, grape::EmptyType>{}>(
[&](auto& e, auto& data) {
data = static_cast<double>(e.get_data());
})(e, edata);
ctx.x[v] += ctx.x_last[e.get_neighbor()] * edata;
}
ctx.x[v] = ctx.x[v] * ctx.alpha + ctx.beta;
messages.Channels()[tid].SendMsgThroughOEdges(frag, v,
ctx.x[v]);
}
ctx.x[v] = ctx.x[v] * ctx.alpha + ctx.beta;
messages.Channels()[tid].SendMsgThroughOEdges(frag, v,
ctx.x[v]);
});
}
}
Expand Down Expand Up @@ -165,6 +169,17 @@ class KatzCentrality
}
ctx.curr_round++;
}

bool filterByDegree(const fragment_t& frag, context_t& ctx, vertex_t v) {
int degree = frag.GetLocalOutDegree(v);
if (frag.directed()) {
degree += frag.GetLocalInDegree(v);
}
if (degree > ctx.degree_threshold) {
return true;
}
return false;
}
};
} // namespace gs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class KatzCentralityContext : public grape::VertexDataContext<FRAG_T, double> {
x(this->data()) {}

void Init(grape::ParallelMessageManager& messages, double alpha, double beta,
double tolerance, int max_round, bool normalized) {
double tolerance, int max_round, bool normalized,
int degree_threshold = std::numeric_limits<int>::max()) {
auto& frag = this->fragment();
auto vertices = frag.Vertices();

Expand All @@ -45,6 +46,7 @@ class KatzCentralityContext : public grape::VertexDataContext<FRAG_T, double> {
this->tolerance = tolerance;
this->max_round = max_round;
this->normalized = normalized;
this->degree_threshold = degree_threshold;
curr_round = 0;
}

Expand All @@ -66,6 +68,7 @@ class KatzCentralityContext : public grape::VertexDataContext<FRAG_T, double> {
double global_sum;
int max_round;
bool normalized;
int degree_threshold;
int curr_round;
};
} // namespace gs
Expand Down
21 changes: 19 additions & 2 deletions analytical_engine/apps/clustering/avg_clustering.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ class AvgClustering
[&ctx](int tid, vertex_t u, int msg) { ctx.global_degree[u] = msg; });

ForEach(inner_vertices.begin(), inner_vertices.end(),
[&frag, &ctx, &messages, &vertices](int tid, vertex_t v) {
[this, &frag, &ctx, &messages, &vertices](int tid, vertex_t v) {
if (filterByDegree(frag, ctx, v)) {
return;
}
vid_t u_gid, v_gid;
auto& nbr_vec = ctx.complete_neighbor[v];
int degree = ctx.global_degree[v];
Expand Down Expand Up @@ -161,9 +164,12 @@ class AvgClustering
messages
.ParallelProcess<fragment_t, std::vector<std::pair<vid_t, uint32_t>>>(
thread_num(), frag,
[&frag, &ctx](
[this, &frag, &ctx](
int tid, vertex_t u,
const std::vector<std::pair<vid_t, uint32_t>>& msg) {
if (frag.IsInnerVertex(u) && filterByDegree(frag, ctx, u)) {
return;
}
auto& nbr_vec = ctx.complete_neighbor[u];
for (auto m : msg) {
auto gid = m.first;
Expand Down Expand Up @@ -239,6 +245,17 @@ class AvgClustering
}
}
}

bool filterByDegree(const fragment_t& frag, context_t& ctx, vertex_t v) {
int degree = frag.GetLocalOutDegree(v);
if (frag.directed()) {
degree += frag.GetLocalInDegree(v);
}
if (degree > ctx.degree_threshold) {
return true;
}
return false;
}
};
} // namespace gs

Expand Down
5 changes: 4 additions & 1 deletion analytical_engine/apps/clustering/avg_clustering_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ class AvgClusteringContext : public TensorContext<FRAG_T, float> {
explicit AvgClusteringContext(const FRAG_T& fragment)
: TensorContext<FRAG_T, float>(fragment) {}

void Init(grape::ParallelMessageManager& messages) {
void Init(grape::ParallelMessageManager& messages,
int degree_threshold = std::numeric_limits<int>::max()) {
auto& frag = this->fragment();
auto vertices = frag.Vertices();
auto inner_vertices = frag.InnerVertices();
Expand All @@ -50,6 +51,7 @@ class AvgClusteringContext : public TensorContext<FRAG_T, float> {
rec_degree.Init(inner_vertices, 0);
complete_neighbor.Init(vertices);
tricnt.Init(vertices, 0);
this->degree_threshold = degree_threshold;
}

void Output(std::ostream& os) override {
Expand All @@ -67,6 +69,7 @@ class AvgClusteringContext : public TensorContext<FRAG_T, float> {
std::vector<std::pair<vertex_t, uint32_t>>>
complete_neighbor;
typename FRAG_T::template vertex_array_t<int> tricnt;
int degree_threshold = 0;
float total_clustering = 0.0;
int stage = 0;
};
Expand Down
20 changes: 18 additions & 2 deletions analytical_engine/apps/clustering/clustering.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ class Clustering
[&ctx](int tid, vertex_t u, int msg) { ctx.global_degree[u] = msg; });

ForEach(inner_vertices.begin(), inner_vertices.end(),
[&frag, &ctx, &messages, &vertices](int tid, vertex_t v) {
[this, &frag, &ctx, &messages, &vertices](int tid, vertex_t v) {
if (filterByDegree(frag, ctx, v)) {
return;
}
int degree = ctx.global_degree[v];
if (degree > 1) {
vid_t u_gid, v_gid;
Expand Down Expand Up @@ -172,9 +175,12 @@ class Clustering
messages
.ParallelProcess<fragment_t, std::vector<std::pair<vid_t, uint32_t>>>(
thread_num(), frag,
[&frag, &ctx](
[this, &frag, &ctx](
int tid, vertex_t u,
const std::vector<std::pair<vid_t, uint32_t>>& msg) {
if (frag.IsInnerVertex(u) && filterByDegree(frag, ctx, u)) {
return;
}
auto& nbr_vec = ctx.complete_neighbor[u];
for (auto m : msg) {
auto gid = m.first;
Expand Down Expand Up @@ -253,6 +259,16 @@ class Clustering
}
}
}
bool filterByDegree(const fragment_t& frag, context_t& ctx, vertex_t v) {
int degree = frag.GetLocalOutDegree(v);
if (frag.directed()) {
degree += frag.GetLocalInDegree(v);
}
if (degree > ctx.degree_threshold) {
return true;
}
return false;
}
};
} // namespace gs

Expand Down
5 changes: 4 additions & 1 deletion analytical_engine/apps/clustering/clustering_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class ClusteringContext : public grape::VertexDataContext<FRAG_T, double> {
explicit ClusteringContext(const FRAG_T& fragment)
: grape::VertexDataContext<FRAG_T, double>(fragment) {}

void Init(grape::ParallelMessageManager& messages) {
void Init(grape::ParallelMessageManager& messages,
int degree_threshold = std::numeric_limits<int>::max()) {
auto& frag = this->fragment();
auto vertices = frag.Vertices();
auto inner_vertices = frag.InnerVertices();
Expand All @@ -48,6 +49,7 @@ class ClusteringContext : public grape::VertexDataContext<FRAG_T, double> {
rec_degree.Init(inner_vertices, 0);
complete_neighbor.Init(vertices);
tricnt.Init(vertices, 0);
this->degree_threshold = degree_threshold;
}

void Output(std::ostream& os) override {
Expand All @@ -73,6 +75,7 @@ class ClusteringContext : public grape::VertexDataContext<FRAG_T, double> {
std::vector<std::pair<vertex_t, uint32_t>>>
complete_neighbor;
typename FRAG_T::template vertex_array_t<uint32_t> tricnt;
int degree_threshold = 0;

int stage = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion analytical_engine/core/grape_instance.cc
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ bl::result<rpc::graph::GraphDefPb> GrapeInstance::modifyEdges(
graph_info.set_property_schema_json(
dynamic::Stringify(fragment->GetSchema()));
graph_def.mutable_extension()->PackFrom(graph_info);
return graph_def;
#else
RETURN_GS_ERROR(vineyard::ErrorCode::kUnimplementedMethod,
"GraphScope is built with NETWORKX=OFF, please recompile it "
"with NETWORKX=ON");
#endif // NETWORKX
return graph_def;
}

bl::result<std::shared_ptr<grape::InArchive>> GrapeInstance::contextToNumpy(
Expand Down
2 changes: 1 addition & 1 deletion coordinator/gscoordinator/template/CMakeLists.template
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ if (libgrapelite_FOUND)
endif()

# find vineyard-----------------------------------------
find_package(vineyard 0.5.3 QUIET)
find_package(vineyard 0.6.0 QUIET)
if (vineyard_FOUND)
include_directories(AFTER SYSTEM ${VINEYARD_INCLUDE_DIRS})
endif()
Expand Down
2 changes: 1 addition & 1 deletion interactive_engine/executor/runtime/native/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ find_package(Threads REQUIRED)
# we need edge src/dst ids in etable.
add_definitions(-DENDPOINT_LISTS)

find_package(vineyard 0.5.3 REQUIRED)
find_package(vineyard 0.6.0 REQUIRED)
add_library(native_store global_store_ffi.cc
htap_ds_impl.cc
graph_builder_ffi.cc
Expand Down
2 changes: 1 addition & 1 deletion k8s/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ else
endif

VERSION ?= latest
VINEYARD_VERSION ?= v0.5.3
VINEYARD_VERSION ?= v0.6.0
PROFILE ?= release


Expand Down
2 changes: 1 addition & 1 deletion k8s/actions-runner-controller/manylinux/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.5.3
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:v0.6.0

ARG TARGETPLATFORM
ARG RUNNER_VERSION=2.287.1
Expand Down
2 changes: 1 addition & 1 deletion k8s/graphscope-dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# the result image includes all runtime stuffs of graphscope, with analytical engine,
# learning engine and interactive engine installed.

ARG BASE_VERSION=v0.5.3
ARG BASE_VERSION=v0.6.0
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:$BASE_VERSION as builder

ARG NETWORKX=ON
Expand Down
2 changes: 1 addition & 1 deletion k8s/graphscope-store.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG BASE_VERSION=v0.5.3
ARG BASE_VERSION=v0.6.0
FROM registry.cn-hongkong.aliyuncs.com/graphscope/graphscope-vineyard:$BASE_VERSION as builder

ARG CI=true
Expand Down
2 changes: 1 addition & 1 deletion k8s/gsvineyard.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ RUN sudo mkdir -p /opt/vineyard && \
make -j`nproc` && \
make install && \
cd /tmp && \
git clone -b v0.5.3 https://github.com/v6d-io/v6d.git --depth=1 && \
git clone -b v0.6.0 https://github.com/v6d-io/v6d.git --depth=1 && \
cd v6d && \
git submodule update --init && \
mkdir -p /tmp/v6d/build && \
Expand Down
2 changes: 1 addition & 1 deletion k8s/ubuntu/gsvineyard.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ RUN cd /tmp && \
make -j`nproc` && \
make install && \
cd /tmp && \
git clone -b v0.5.3 https://github.com/v6d-io/v6d.git --depth=1 && \
git clone -b v0.6.0 https://github.com/v6d-io/v6d.git --depth=1 && \
cd v6d && \
git submodule update --init && \
mkdir -p /tmp/v6d/build && \
Expand Down
Loading

0 comments on commit 889e003

Please sign in to comment.