Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parallel implementation for all_pairs_bellman_ford_path #14

Merged
merged 7 commits into from
Dec 5, 2023
1 change: 1 addition & 0 deletions nx_parallel/algorithms/shortest_paths/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .weighted import *
69 changes: 69 additions & 0 deletions nx_parallel/algorithms/shortest_paths/weighted.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from joblib import Parallel, delayed
from networkx.algorithms.shortest_paths.weighted import (
single_source_bellman_ford_path
)

import nx_parallel as nxp

__all__ = ["all_pairs_bellman_ford_path"]


def all_pairs_bellman_ford_path(G, weight="weight"):
"""Compute shortest paths between all nodes in a weighted graph.

Parameters
----------
G : NetworkX graph

weight : string or function (default="weight")
If this is a string, then edge weights will be accessed via the
edge attribute with this key (that is, the weight of the edge
joining `u` to `v` will be ``G.edges[u, v][weight]``). If no
such edge attribute exists, the weight of the edge is assumed to
be one.

If this is a function, the weight of an edge is the value
returned by the function. The function must accept exactly three
positional arguments: the two endpoints of an edge and the
dictionary of edge attributes for that edge. The function must
return a number.

Returns
-------
all_paths :
dictionary keyed by source with value as another dictionary keyed
by target and shortest path as its key value.

Notes
-----
Edge weight attributes must be numerical.
Distances are calculated as sums of weighted edges traversed.

"""
if hasattr(G, "graph_object"):
G = G.graph_object

nodes = G.nodes

total_cores = nxp.cpu_count()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be moving this to the function signature and match something like what scikit-learn does (n_jobs) but not a blocker here.


num_in_chunk = max(len(nodes) // total_cores, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)

paths_list = Parallel(n_jobs=total_cores)(delayed(_calculate_shortest_paths_subset)(G, chunk, weight) for chunk in node_chunks)


all_paths = {}
for result in paths_list:
for source, paths in result.items():
all_paths[source]=paths
return all_paths


# Helper function
def _calculate_shortest_paths_subset(G, chunk, weight):
result = {}
for source in chunk:
paths = single_source_bellman_ford_path(G, source, weight=weight)
result[source] = paths
return result
4 changes: 4 additions & 0 deletions nx_parallel/interface.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from nx_parallel.algorithms.centrality.betweenness import betweenness_centrality
from nx_parallel.algorithms.shortest_paths.weighted import all_pairs_bellman_ford_path
from nx_parallel.algorithms.efficiency_measures import (
local_efficiency,
)
Expand Down Expand Up @@ -43,6 +44,9 @@ class Dispatcher:
# Efficiency
local_efficiency = local_efficiency

# Shortest Paths : all pairs shortest paths(bellman_ford)
all_pairs_bellman_ford_path = all_pairs_bellman_ford_path

# =============================

@staticmethod
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
113 changes: 57 additions & 56 deletions timing/timing_individual_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,67 +11,68 @@
heatmapDF = pd.DataFrame()
number_of_nodes_list = [10, 50, 100, 300, 500]
pList = [1, 0.8, 0.6, 0.4, 0.2]
currFun = nx.betweenness_centrality
for i in range(0, len(pList)):
p = pList[i]
for j in range(0, len(number_of_nodes_list)):
num = number_of_nodes_list[j]
funcs = [nx.betweenness_centrality, nx.all_pairs_bellman_ford_path]
for currFun in funcs:
for i in range(0, len(pList)):
p = pList[i]
for j in range(0, len(number_of_nodes_list)):
num = number_of_nodes_list[j]

# create original and parallel graphs
G = nx.fast_gnp_random_graph(num, p, directed=False)
H = nx_parallel.ParallelGraph(G)
# create original and parallel graphs
G = nx.fast_gnp_random_graph(num, p, directed=False)
H = nx_parallel.ParallelGraph(G)

# time both versions and update heatmapDF
t1 = time.time()
c = currFun(H)
t2 = time.time()
parallelTime = t2 - t1
t1 = time.time()
c = currFun(G)
t2 = time.time()
stdTime = t2 - t1
timesFaster = stdTime / parallelTime
heatmapDF.at[j, i] = timesFaster
print("Finished " + str(currFun))
# time both versions and update heatmapDF
t1 = time.time()
c = currFun(H)
t2 = time.time()
parallelTime = t2 - t1
t1 = time.time()
c = currFun(G)
t2 = time.time()
stdTime = t2 - t1
timesFaster = stdTime / parallelTime
heatmapDF.at[j, i] = timesFaster
print("Finished " + str(currFun))

# Code to create for row of heatmap specifically for tournaments
# for i in range(0, len(pList)):
# p = pList[i]
# for j in range(0, len(number_of_nodes_list)):
# num = number_of_nodes_list[j]
# G = nx.tournament.random_tournament(num)
# H = nx_parallel.ParallelDiGraph(G)
# t1 = time.time()
# c = nx.tournament.is_reachable(H, 1, num)
# t2 = time.time()
# parallelTime = t2-t1
# t1 = time.time()
# c = nx.tournament.is_reachable(G, 1, num)
# t2 = time.time()
# stdTime = t2-t1
# timesFaster = stdTime/parallelTime
# heatmapDF.at[j, 3] = timesFaster
# Code to create for row of heatmap specifically for tournaments
# for i in range(0, len(pList)):
# p = pList[i]
# for j in range(0, len(number_of_nodes_list)):
# num = number_of_nodes_list[j]
# G = nx.tournament.random_tournament(num)
# H = nx_parallel.ParallelDiGraph(G)
# t1 = time.time()
# c = nx.tournament.is_reachable(H, 1, num)
# t2 = time.time()
# parallelTime = t2-t1
# t1 = time.time()
# c = nx.tournament.is_reachable(G, 1, num)
# t2 = time.time()
# stdTime = t2-t1
# timesFaster = stdTime/parallelTime
# heatmapDF.at[j, 3] = timesFaster

# plotting the heatmap with numbers and a green color scheme
plt.figure(figsize=(20, 4))
hm = sns.heatmap(data=heatmapDF.T, annot=True, cmap="Greens", cbar=True)
# plotting the heatmap with numbers and a green color scheme
plt.figure(figsize=(20, 4))
hm = sns.heatmap(data=heatmapDF.T, annot=True, cmap="Greens", cbar=True)

# Remove the tick labels on both axes
hm.set_yticklabels(pList)
# Remove the tick labels on both axes
hm.set_yticklabels(pList)

# Adding x-axis labels
hm.set_xticklabels(number_of_nodes_list)
# Adding x-axis labels
hm.set_xticklabels(number_of_nodes_list)

# Rotating the x-axis labels for better readability (optional)
plt.xticks(rotation=45)
plt.yticks(rotation=20)
plt.title(
"Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to networkx"
)
plt.xlabel("Number of Vertices")
plt.ylabel("Edge Probability")
print(currFun.__name__)
# Rotating the x-axis labels for better readability (optional)
plt.xticks(rotation=45)
plt.yticks(rotation=20)
plt.title(
"Small Scale Demo: Times Speedups of " + currFun.__name__ + " compared to networkx"
)
plt.xlabel("Number of Vertices")
plt.ylabel("Edge Probability")
print(currFun.__name__)

# displaying the plotted heatmap
plt.tight_layout()
plt.savefig("timing/" + "heatmap_" + currFun.__name__ + "_timing.png")
# displaying the plotted heatmap
plt.tight_layout()
plt.savefig("timing/" + "heatmap_" + currFun.__name__ + "_timing.png")