Skip to content

Commit

Permalink
ENH : adding parallel implementations of all_pairs_ algos (#33)
Browse files Browse the repository at this point in the history
* initial commit
resolved conflicts(rebased)

* rm extra docs
resolved conflicts(rebased)

* added unweighted.py rm johnson and all non all_pairs_ algos from weighted.py

* added all_pairs_node_connectivity
resolved conflicts(rebased)

* Update weighted.py
resolved conflicts(rebased)

* modifying G

* fixed all_pairs_node_connectivity

* un-updated ParallelGraph class

* style fixes

* changed all_pairs def

* adding directed to _calculate_all_pairs_node_connectivity_subset

* updated docs of all funcs

* style fix

* added benchmarks

* style fix

* added 6 heatmaps(no speedups in 3)

* used loky backend in approximation.connectivity.all_pairs_node_connectivity and added connectivity.connectivity.all_pairs_node_connectivity and all_pairs_all_shortest_paths

* added get_chunks to shortest paths algos

* added chunking in all_pairs_node_connectivity, added benchmarks, fixed an if-else statement, added and updated heatmaps(no speedups for all_pairs_shortest_path_length)

* updated docstrings of all 9 funcs

* typo fix

Co-authored-by: Dan Schult <[email protected]>

---------

Co-authored-by: Dan Schult <[email protected]>
  • Loading branch information
Schefflera-Arboricola and dschult authored Mar 26, 2024
1 parent 87b822a commit 84d5635
Show file tree
Hide file tree
Showing 21 changed files with 621 additions and 3 deletions.
22 changes: 22 additions & 0 deletions benchmarks/benchmarks/bench_approximation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from .common import (
backends,
num_nodes,
edge_prob,
get_cached_gnp_random_graph,
Benchmark,
)
import networkx as nx
import nx_parallel as nxp


class Connectivity(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_approximate_all_pairs_node_connectivity(
self, backend, num_nodes, edge_prob
):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
if backend == "parallel":
G = nxp.ParallelGraph(G)
_ = nx.algorithms.approximation.connectivity.all_pairs_node_connectivity(G)
20 changes: 20 additions & 0 deletions benchmarks/benchmarks/bench_connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from .common import (
backends,
num_nodes,
edge_prob,
get_cached_gnp_random_graph,
Benchmark,
)
import networkx as nx
import nx_parallel as nxp


class Connectivity(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_node_connectivity(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
if backend == "parallel":
G = nxp.ParallelGraph(G)
_ = nx.algorithms.connectivity.connectivity.all_pairs_node_connectivity(G)
38 changes: 38 additions & 0 deletions benchmarks/benchmarks/bench_shortest_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,48 @@
import networkx as nx


class Generic(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_all_shortest_paths(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_all_shortest_paths(G, weight="weight", backend=backend))


class Unweighted(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_shortest_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = dict(nx.all_pairs_shortest_path_length(G, backend=backend))

def time_all_pairs_shortest_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob)
_ = dict(nx.all_pairs_shortest_path(G, backend=backend))


class Weighted(Benchmark):
params = [(backends), (num_nodes), (edge_prob)]
param_names = ["backend", "num_nodes", "edge_prob"]

def time_all_pairs_dijkstra(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra(G, backend=backend))

def time_all_pairs_dijkstra_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra_path_length(G, backend=backend))

def time_all_pairs_dijkstra_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_dijkstra_path(G, backend=backend))

def time_all_pairs_bellman_ford_path_length(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_bellman_ford_path_length(G, backend=backend))

def time_all_pairs_bellman_ford_path(self, backend, num_nodes, edge_prob):
G = get_cached_gnp_random_graph(num_nodes, edge_prob, is_weighted=True)
_ = dict(nx.all_pairs_bellman_ford_path(G, backend=backend))
Expand Down
2 changes: 2 additions & 0 deletions nx_parallel/algorithms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from .bipartite import *
from .centrality import *
from .shortest_paths import *
from .approximation import *
from .connectivity import *

# modules
from .efficiency_measures import *
Expand Down
1 change: 1 addition & 0 deletions nx_parallel/algorithms/approximation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .connectivity import *
71 changes: 71 additions & 0 deletions nx_parallel/algorithms/approximation/connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"""Parallel implementations of fast approximation for node connectivity"""
import itertools
from joblib import Parallel, delayed
from networkx.algorithms.approximation.connectivity import local_node_connectivity
import nx_parallel as nxp

__all__ = [
"all_pairs_node_connectivity",
]


def all_pairs_node_connectivity(G, nbunch=None, cutoff=None, get_chunks="chunks"):
"""The parallel implementation first divides the a list of all permutation (in case
of directed graphs) and combinations (in case of undirected graphs) of `nbunch`
into chunks and then creates a generator to lazily compute the local node
connectivities for each chunk, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores. At the end,
the results are aggregated into a single dictionary and returned.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in `list(iter_func(nbunch, 2))` as input and returns
an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of
directed graphs and `combinations` in case of undirected graphs. The default
is to create chunks by slicing the list into `n` chunks, where `n` is the
number of CPU cores, such that size of each chunk is atmost 10, and at least 1.
networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.approximation.connectivity.all_pairs_node_connectivity.html
"""

if hasattr(G, "graph_object"):
G = G.graph_object

if nbunch is None:
nbunch = G
else:
nbunch = set(nbunch)

directed = G.is_directed()
if directed:
iter_func = itertools.permutations
else:
iter_func = itertools.combinations

all_pairs = {n: {} for n in nbunch}

def _process_pair_chunk(pairs_chunk):
return [
(u, v, local_node_connectivity(G, u, v, cutoff=cutoff))
for u, v in pairs_chunk
]

pairs = list(iter_func(nbunch, 2))
total_cores = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(min(len(pairs) // total_cores, 10), 1)
pairs_chunks = nxp.chunks(pairs, num_in_chunk)
else:
pairs_chunks = get_chunks(pairs)

nc_chunk_generator = ( # nc = node connectivity
delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks
)

for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator):
for u, v, k in nc_chunk:
all_pairs[u][v] = k
if not directed:
all_pairs[v][u] = k
return all_pairs
1 change: 1 addition & 0 deletions nx_parallel/algorithms/connectivity/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .connectivity import *
80 changes: 80 additions & 0 deletions nx_parallel/algorithms/connectivity/connectivity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Parallel flow based connectivity algorithms
"""

import itertools
from networkx.algorithms.flow import build_residual_network
from networkx.algorithms.connectivity.utils import build_auxiliary_node_connectivity
from networkx.algorithms.connectivity.connectivity import local_node_connectivity
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = [
"all_pairs_node_connectivity",
]


def all_pairs_node_connectivity(G, nbunch=None, flow_func=None, get_chunks="chunks"):
"""The parallel implementation first divides a list of all permutation (in case
of directed graphs) and combinations (in case of undirected graphs) of `nbunch`
into chunks and then creates a generator to lazily compute the local node
connectivities for each chunk, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores. At the end,
the results are aggregated into a single dictionary and returned.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in `list(iter_func(nbunch, 2))` as input and returns
an iterable `pairs_chunks`, here `iter_func` is `permutations` in case of
directed graphs and `combinations` in case of undirected graphs. The default
is to create chunks by slicing the list into `n` chunks, where `n` is the
number of CPU cores, such that size of each chunk is atmost 10, and at least 1.
networkx.all_pairs_node_connectivity : https://networkx.org/documentation/stable/reference/algorithms/generated/networkx.algorithms.connectivity.connectivity.all_pairs_node_connectivity.html
"""

if hasattr(G, "graph_object"):
G = G.graph_object

if nbunch is None:
nbunch = G
else:
nbunch = set(nbunch)

directed = G.is_directed()
if directed:
iter_func = itertools.permutations
else:
iter_func = itertools.combinations

all_pairs = {n: {} for n in nbunch}

# Reuse auxiliary digraph and residual network
H = build_auxiliary_node_connectivity(G)
R = build_residual_network(H, "capacity")
kwargs = {"flow_func": flow_func, "auxiliary": H, "residual": R}

def _process_pair_chunk(pairs_chunk):
return [
(u, v, local_node_connectivity(G, u, v, **kwargs)) for u, v in pairs_chunk
]

pairs = list(iter_func(nbunch, 2))
total_cores = nxp.cpu_count()
if get_chunks == "chunks":
num_in_chunk = max(min(len(pairs) // total_cores, 10), 1)
pairs_chunks = nxp.chunks(pairs, num_in_chunk)
else:
pairs_chunks = get_chunks(pairs)

nc_chunk_generator = ( # nc = node connectivity
delayed(_process_pair_chunk)(pairs_chunk) for pairs_chunk in pairs_chunks
)

for nc_chunk in Parallel(n_jobs=total_cores)(nc_chunk_generator):
for u, v, k in nc_chunk:
all_pairs[u][v] = k
if not directed:
all_pairs[v][u] = k
return all_pairs
2 changes: 2 additions & 0 deletions nx_parallel/algorithms/shortest_paths/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
from .generic import *
from .weighted import *
from .unweighted import *
57 changes: 57 additions & 0 deletions nx_parallel/algorithms/shortest_paths/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from networkx.algorithms.shortest_paths.generic import single_source_all_shortest_paths
from joblib import Parallel, delayed
import nx_parallel as nxp

__all__ = [
"all_pairs_all_shortest_paths",
]


def all_pairs_all_shortest_paths(
G, weight=None, method="dijkstra", get_chunks="chunks"
):
"""The parallel implementation first divides the nodes into chunks and then
creates a generator to lazily compute all shortest paths between all nodes for
each node in `node_chunk`, and then employs joblib's `Parallel` function to
execute these computations in parallel across all available CPU cores.
Parameters
------------
get_chunks : str, function (default = "chunks")
A function that takes in an iterable of all the nodes as input and returns
an iterable `node_chunks`. The default chunking is done by slicing the
`G.nodes` into `n` chunks, where `n` is the number of CPU cores.
networkx.single_source_all_shortest_paths : https://github.com/networkx/networkx/blob/de85e3fe52879f819e7a7924474fc6be3994e8e4/networkx/algorithms/shortest_paths/generic.py#L606
"""

def _process_node_chunk(node_chunk):
return [
(
n,
dict(
single_source_all_shortest_paths(G, n, weight=weight, method=method)
),
)
for n in node_chunk
]

if hasattr(G, "graph_object"):
G = G.graph_object

nodes = G.nodes
total_cores = nxp.cpu_count()

if get_chunks == "chunks":
num_in_chunk = max(len(nodes) // total_cores, 1)
node_chunks = nxp.chunks(nodes, num_in_chunk)
else:
node_chunks = get_chunks(nodes)

paths_chunk_generator = (
delayed(_process_node_chunk)(node_chunk) for node_chunk in node_chunks
)

for path_chunk in Parallel(n_jobs=total_cores)(paths_chunk_generator):
for path in path_chunk:
yield path
Loading

0 comments on commit 84d5635

Please sign in to comment.