Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(learning): Integrate GLTorch into GraphScope #3230

Merged
merged 17 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,7 @@
[submodule "flex/grin"]
path = flex/grin
url = https://github.com/GraphScope/GRIN.git

[submodule "learning_engine/graphlearn-for-pytorch"]
path = learning_engine/graphlearn-for-pytorch
url = https://github.com/alibaba/graphlearn-for-pytorch.git
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ NETWORKX ?= ON
# testing build option
BUILD_TEST ?= OFF

# whether to build graphlearn-torch extension (graphlearn is built by default)
WITH_GLTORCH ?= ON

# INSTALL_PREFIX is environment variable, but if it is not set, then set default value
ifeq ($(INSTALL_PREFIX),)
Expand Down Expand Up @@ -76,6 +78,9 @@ client: learning
python3 -m pip install -r requirements.txt -r requirements-dev.txt --user && \
export PATH=$(PATH):$(HOME)/.local/bin && \
python3 setup.py build_ext --inplace --user && \
if [ $(WITH_GLTORCH) = ON ]; then \
python3 setup.py build_gltorch_ext --inplace --user; \
fi && \
python3 -m pip install --user --no-build-isolation --editable $(CLIENT_DIR) && \
rm -rf $(CLIENT_DIR)/*.egg-info

Expand Down
8 changes: 6 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,14 @@ def _match_frontend_endpoint(pattern, lines):
def CreateLearningInstance(self, request, context):
object_id = request.object_id
logger.info("Create learning instance with object id %ld", object_id)
handle, config = request.handle, request.config
handle, config, learning_backend = (
request.handle,
request.config,
request.learning_backend,
)
try:
endpoints = self._launcher.create_learning_instance(
object_id, handle, config
object_id, handle, config, learning_backend
)
self._object_manager.put(object_id, LearningInstanceManager(object_id))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ def _distribute_learning_process(
self._learning_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = LEARNING_CONTAINER_NAME
sub_cmd = f"python3 -m gscoordinator.learning {handle} {config} {pod_index}"
sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ def _distribute_learning_process(
self._api_client, object_id, pod_host_ip_list
)

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def launch_server(handle, config, server_index):

if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: ./learning.py <handle> <config> <server_index>", file=sys.stderr)
print(
"Usage: ./launch_graphlearn.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
Expand Down
93 changes: 93 additions & 0 deletions coordinator/gscoordinator/launch_graphlearn_torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import json
import logging
import os.path as osp
import sys

import graphscope.learning.graphlearn_torch as glt
import torch
from graphscope.learning.gl_torch_graph import GLTorchGraph

logger = logging.getLogger("graphscope")


def decode_arg(arg):
if isinstance(arg, dict):
return arg
return json.loads(
base64.b64decode(arg.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)


def run_server_proc(proc_rank, handle, config, server_rank, dataset):
glt.distributed.init_server(
num_servers=handle["num_servers"],
server_rank=server_rank,
dataset=dataset,
master_addr=handle["master_addr"],
master_port=handle["server_client_master_port"],
num_rpc_threads=16,
is_dynamic=True,
)
logger.info(f"-- [Server {server_rank}] Waiting for exit ...")
glt.distributed.wait_and_shutdown_server()
logger.info(f"-- [Server {server_rank}] Exited ...")


def launch_graphlearn_torch_server(handle, config, server_rank):
logger.info(f"-- [Server {server_rank}] Initializing server ...")

edge_dir = config.pop("edge_dir")
random_node_split = config.pop("random_node_split")
dataset = glt.distributed.DistDataset(edge_dir=edge_dir)
dataset.load_vineyard(
vineyard_id=str(handle["vineyard_id"]),
vineyard_socket=handle["vineyard_socket"],
**config,
)
if random_node_split is not None:
dataset.random_node_split(**random_node_split)
logger.info(f"-- [Server {server_rank}] Initializing server ...")

torch.multiprocessing.spawn(
fn=run_server_proc, args=(handle, config, server_rank, dataset), nprocs=1
)


if __name__ == "__main__":
if len(sys.argv) < 3:
logger.info(
"Usage: ./launch_graphlearn_torch.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
config = decode_arg(sys.argv[2])
server_index = int(sys.argv[3])
config = GLTorchGraph.reverse_transform_config(config)

logger.info(
f"launch_graphlearn_torch_server handle: {handle} config: {config} server_index: {server_index}"
)
launch_graphlearn_torch_server(handle, config, server_index)
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def create_interactive_instance(
pass

@abstractmethod
def create_learning_instance(self, object_id: int, handle: str, config: str):
def create_learning_instance(
self, object_id: int, handle: str, config: str, learning_backend: int
):
pass

@abstractmethod
Expand Down
79 changes: 76 additions & 3 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from graphscope.framework.utils import get_java_version
from graphscope.framework.utils import get_tempdir
from graphscope.framework.utils import is_free_port
from graphscope.proto import message_pb2
from graphscope.proto import types_pb2

from gscoordinator.launcher import AbstractLauncher
Expand Down Expand Up @@ -245,7 +246,19 @@ def _popen_helper(cmd, cwd, env, stdout=None, stderr=None):
)
return process

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
return self._create_graphlearn_instance(
object_id=object_id, handle=handle, config=config
)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
return self._create_graphlearn_torch_instance(
object_id=object_id, handle=handle, config=config
)
else:
raise ValueError("invalid learning backend")

def _create_graphlearn_instance(self, object_id, handle, config):
# prepare argument
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
Expand Down Expand Up @@ -275,12 +288,12 @@ def create_learning_instance(self, object_id, handle, config):
cmd = [
sys.executable,
"-m",
"gscoordinator.learning",
"gscoordinator.launch_graphlearn",
handle,
config,
str(index),
]
logger.debug("launching learning server: %s", " ".join(cmd))
logger.debug("launching graphlearn server: %s", " ".join(cmd))

proc = self._popen_helper(cmd, cwd=None, env=env)
stdout_watcher = PipeWatcher(proc.stdout, sys.stdout)
Expand All @@ -289,6 +302,66 @@ def create_learning_instance(self, object_id, handle, config):
self._learning_instance_processes[object_id].append(proc)
return server_list

def _create_graphlearn_torch_instance(self, object_id, handle, config):
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
"utf-8", errors="ignore"
)
)

server_client_master_port = get_free_port("localhost")
handle["server_client_master_port"] = server_client_master_port

server_list = [f"localhost:{server_client_master_port}"]
# for train, val and test
for _ in range(3):
server_list.append("localhost:" + str(get_free_port("localhost")))

handle = base64.b64encode(
json.dumps(handle).encode("utf-8", errors="ignore")
).decode("utf-8", errors="ignore")

# launch the server
env = os.environ.copy()
# set coordinator dir to PYTHONPATH
python_path = (
env.get("PYTHONPATH", "")
+ os.pathsep
+ os.path.dirname(os.path.dirname(__file__))
)
env["PYTHONPATH"] = python_path

self._learning_instance_processes[object_id] = []
for index in range(self._num_workers):
cmd = [
sys.executable,
"-m",
"gscoordinator.launch_graphlearn_torch",
handle,
config,
str(index),
]
logger.debug("launching graphlearn_torch server: %s", " ".join(str(cmd)))

proc = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
setattr(proc, "stdout_watcher", stdout_watcher)
self._learning_instance_processes[object_id].append(proc)
return server_list

def close_analytical_instance(self):
self._stop_subprocess(self._analytical_engine_process, kill=True)
self._analytical_engine_endpoint = None
Expand Down
29 changes: 26 additions & 3 deletions docs/learning_engine/guide_and_examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ tutorial_node_classification_k8s
This section contains a guide for the learning engine and a number of examples.

```{tip}
We assume you has read the [getting_started](getting_started.md) section and know how to launch a GraphScope session.
We assume you has read the [getting_started](getting_started.md) section and
know how to launch a GraphScope session.
```

We present an end-to-end example, demonstrating how GLE trains a node classification model on a citation network using the local mode of GraphScope.
We present an end-to-end example, demonstrating how GLE trains a node
classification model on a citation network using the local mode of GraphScope.

````{panels}
:header: text-center
Expand All @@ -31,7 +33,11 @@ We present an end-to-end example, demonstrating how GLE trains a node classifica
Training a Node Classification Model on Your Local Machine.
````

GraphScope is designed for processing large graphs, which are usually hard to fit in the memory of a single machine. With vineyard as the distributed in-memory data manager, GraphScope supports run on a cluster managed by Kubernetes(k8s). Next, we revisit the example we present in the first tutorial, showing how GraphScope process the node classification task on a Kubernetes cluster.
GraphScope is designed for processing large graphs, which are usually hard to
fit in the memory of a single machine. With vineyard as the distributed
in-memory data manager, GraphScope supports run on a cluster managed by
Kubernetes(k8s). Next, we revisit the example we present in the first tutorial,
showing how GraphScope process the node classification task on a Kubernetes cluster.


````{panels}
Expand All @@ -45,3 +51,20 @@ GraphScope is designed for processing large graphs, which are usually hard to fi
^^^^^^^^^^^^^^
Training a Node Classification Model on K8s Cluster
````


GraphScope is also compatible with PyG models, the following examples shows
ho2 to train a PyG model using GraphScope on your local machine.


````{panels}
:header: text-center
:column: col-lg-12 p-2

```{link-button} tutorial_node_classification_pyg_local.html
:text: Tutorial
:classes: btn-block stretched-link
```
^^^^^^^^^^^^^^
Training a Node Classification Model(PyG) on Your Local Machine
````
Loading