Skip to content

Commit

Permalink
[Feat] Implement the pipeline for loading vineyard graph as graphlear…
Browse files Browse the repository at this point in the history
…n_torch dataset and training on a single machine (#3156)

## What do these changes do?
This pr introduces the following changes:
1. A new graphlearn_torch API for session.
2. Add the script to launch the graphlearn_torch server with handle and
config.
3. Include an example of graphsage node classification with the
ogbn-arxiv dataset in GraphScope on a single machine.
## Related issue number
#3157
  • Loading branch information
Zhanghyi authored Sep 11, 2023
1 parent 5741a3e commit 19499a2
Show file tree
Hide file tree
Showing 14 changed files with 548 additions and 15 deletions.
8 changes: 6 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,14 @@ def _match_frontend_endpoint(pattern, lines):
def CreateLearningInstance(self, request, context):
object_id = request.object_id
logger.info("Create learning instance with object id %ld", object_id)
handle, config = request.handle, request.config
handle, config, learning_backend = (
request.handle,
request.config,
request.learning_backend,
)
try:
endpoints = self._launcher.create_learning_instance(
object_id, handle, config
object_id, handle, config, learning_backend
)
self._object_manager.put(object_id, LearningInstanceManager(object_id))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ def _distribute_learning_process(
self._learning_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = LEARNING_CONTAINER_NAME
sub_cmd = f"python3 -m gscoordinator.learning {handle} {config} {pod_index}"
sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ def _distribute_learning_process(
self._api_client, object_id, pod_host_ip_list
)

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def launch_server(handle, config, server_index):

if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: ./learning.py <handle> <config> <server_index>", file=sys.stderr)
print(
"Usage: ./launch_graphlearn.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
Expand Down
88 changes: 88 additions & 0 deletions coordinator/gscoordinator/launch_graphlearn_torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import logging
import os.path as osp
import pickle
import sys

import graphscope.learning.graphlearn_torch as glt
import torch

logger = logging.getLogger("graphscope")


def decode_arg(arg):
if isinstance(arg, dict):
return arg
return pickle.loads(base64.b64decode(arg))


def run_server_proc(proc_rank, handle, config, server_rank, dataset):
glt.distributed.init_server(
num_servers=handle["num_servers"],
num_clients=handle["num_clients"],
server_rank=server_rank,
dataset=dataset,
master_addr=handle["master_addr"],
master_port=handle["server_client_master_port"],
num_rpc_threads=16,
# server_group_name="dist_train_supervised_sage_server",
)
logger.info(f"-- [Server {server_rank}] Waiting for exit ...")
glt.distributed.wait_and_shutdown_server()
logger.info(f"-- [Server {server_rank}] Exited ...")


def launch_graphlearn_torch_server(handle, config, server_rank):
logger.info(f"-- [Server {server_rank}] Initializing server ...")

edge_dir = config.pop("edge_dir")
random_node_split = config.pop("random_node_split")
dataset = glt.distributed.DistDataset(edge_dir=edge_dir)
dataset.load_vineyard(
vineyard_id=str(handle["vineyard_id"]),
vineyard_socket=handle["vineyard_socket"],
**config,
)
if random_node_split is not None:
dataset.random_node_split(**random_node_split)
logger.info(f"-- [Server {server_rank}] Initializing server ...")

torch.multiprocessing.spawn(
fn=run_server_proc, args=(handle, config, server_rank, dataset), nprocs=1
)


if __name__ == "__main__":
if len(sys.argv) < 3:
logger.info(
"Usage: ./launch_graphlearn_torch.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
config = decode_arg(sys.argv[2])
server_index = int(sys.argv[3])

logger.info(
f"launch_graphlearn_torch_server handle: {handle} config: {config} server_index: {server_index}"
)
launch_graphlearn_torch_server(handle, config, server_index)
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def create_interactive_instance(
pass

@abstractmethod
def create_learning_instance(self, object_id: int, handle: str, config: str):
def create_learning_instance(
self, object_id: int, handle: str, config: str, learning_backend: int
):
pass

@abstractmethod
Expand Down
75 changes: 72 additions & 3 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from graphscope.framework.utils import get_java_version
from graphscope.framework.utils import get_tempdir
from graphscope.framework.utils import is_free_port
from graphscope.proto import message_pb2
from graphscope.proto import types_pb2

from gscoordinator.launcher import AbstractLauncher
Expand Down Expand Up @@ -245,7 +246,19 @@ def _popen_helper(cmd, cwd, env, stdout=None, stderr=None):
)
return process

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
return self._create_graphlearn_instance(
object_id=object_id, handle=handle, config=config
)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
return self._create_graphlearn_torch_instance(
object_id=object_id, handle=handle, config=config
)
else:
raise ValueError("invalid learning backend")

def _create_graphlearn_instance(self, object_id, handle, config):
# prepare argument
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
Expand Down Expand Up @@ -275,12 +288,12 @@ def create_learning_instance(self, object_id, handle, config):
cmd = [
sys.executable,
"-m",
"gscoordinator.learning",
"gscoordinator.launch_graphlearn",
handle,
config,
str(index),
]
logger.debug("launching learning server: %s", " ".join(cmd))
logger.debug("launching graphlearn server: %s", " ".join(cmd))

proc = self._popen_helper(cmd, cwd=None, env=env)
stdout_watcher = PipeWatcher(proc.stdout, sys.stdout)
Expand All @@ -289,6 +302,62 @@ def create_learning_instance(self, object_id, handle, config):
self._learning_instance_processes[object_id].append(proc)
return server_list

def _create_graphlearn_torch_instance(self, object_id, handle, config):
import pickle

handle = pickle.loads(base64.b64decode(handle))

server_client_master_port = get_free_port("localhost")
handle["server_client_master_port"] = server_client_master_port

server_list = [f"localhost:{server_client_master_port}"]
# for train, val and test
for _ in range(3):
server_list.append("localhost:" + str(get_free_port("localhost")))

handle = base64.b64encode(pickle.dumps(handle))

# launch the server
env = os.environ.copy()
# set coordinator dir to PYTHONPATH
python_path = (
env.get("PYTHONPATH", "")
+ os.pathsep
+ os.path.dirname(os.path.dirname(__file__))
)
env["PYTHONPATH"] = python_path

self._learning_instance_processes[object_id] = []
for index in range(self._num_workers):
cmd = [
sys.executable,
"-m",
"gscoordinator.launch_graphlearn_torch",
handle,
config,
str(index),
]
logger.debug("launching graphlearn_torch server: %s", " ".join(str(cmd)))

proc = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
setattr(proc, "stdout_watcher", stdout_watcher)
self._learning_instance_processes[object_id].append(proc)
return server_list

def close_analytical_instance(self):
self._stop_subprocess(self._analytical_engine_process, kill=True)
self._analytical_engine_endpoint = None
Expand Down
Loading

0 comments on commit 19499a2

Please sign in to comment.