Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat] Implement the pipeline for loading vineyard graph as graphlearn_torch dataset and training on a single machine #3156

Merged
merged 11 commits into from
Sep 11, 2023
8 changes: 6 additions & 2 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,14 @@ def _match_frontend_endpoint(pattern, lines):
def CreateLearningInstance(self, request, context):
object_id = request.object_id
logger.info("Create learning instance with object id %ld", object_id)
handle, config = request.handle, request.config
handle, config, learning_backend = (
request.handle,
request.config,
request.learning_backend,
)
try:
endpoints = self._launcher.create_learning_instance(
object_id, handle, config
object_id, handle, config, learning_backend
)
self._object_manager.put(object_id, LearningInstanceManager(object_id))
except Exception as e:
Expand Down
4 changes: 2 additions & 2 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,7 +1291,7 @@ def _distribute_learning_process(
self._learning_instance_processes[object_id] = []
for pod_index, pod in enumerate(self._pod_name_list):
container = LEARNING_CONTAINER_NAME
sub_cmd = f"python3 -m gscoordinator.learning {handle} {config} {pod_index}"
sub_cmd = f"python3 -m gscoordinator.launch_graphlearn {handle} {config} {pod_index}"
cmd = f"kubectl -n {self._namespace} exec -it -c {container} {pod} -- {sub_cmd}"
logger.debug("launching learning server: %s", " ".join(cmd))
proc = subprocess.Popen(
Expand Down Expand Up @@ -1321,7 +1321,7 @@ def _distribute_learning_process(
self._api_client, object_id, pod_host_ip_list
)

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
pod_name_list, _, pod_host_ip_list = self._allocate_learning_engine(object_id)
if not pod_name_list or not pod_host_ip_list:
raise RuntimeError("Failed to allocate learning engine")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def launch_server(handle, config, server_index):

if __name__ == "__main__":
if len(sys.argv) < 3:
print("Usage: ./learning.py <handle> <config> <server_index>", file=sys.stderr)
print(
"Usage: ./launch_graphlearn.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
Expand Down
88 changes: 88 additions & 0 deletions coordinator/gscoordinator/launch_graphlearn_torch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright 2020 Alibaba Group Holding Limited. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import base64
import logging
import os.path as osp
import pickle
import sys

import graphscope.learning.graphlearn_torch as glt
import torch

logger = logging.getLogger("graphscope")


def decode_arg(arg):
if isinstance(arg, dict):
return arg
return pickle.loads(base64.b64decode(arg))


def run_server_proc(proc_rank, handle, config, server_rank, dataset):
glt.distributed.init_server(
num_servers=handle["num_servers"],
num_clients=handle["num_clients"],
server_rank=server_rank,
dataset=dataset,
master_addr=handle["master_addr"],
master_port=handle["server_client_master_port"],
num_rpc_threads=16,
# server_group_name="dist_train_supervised_sage_server",
)
logger.info(f"-- [Server {server_rank}] Waiting for exit ...")
glt.distributed.wait_and_shutdown_server()
logger.info(f"-- [Server {server_rank}] Exited ...")


def launch_graphlearn_torch_server(handle, config, server_rank):
logger.info(f"-- [Server {server_rank}] Initializing server ...")

edge_dir = config.pop("edge_dir")
random_node_split = config.pop("random_node_split")
dataset = glt.distributed.DistDataset(edge_dir=edge_dir)
dataset.load_vineyard(
vineyard_id=str(handle["vineyard_id"]),
vineyard_socket=handle["vineyard_socket"],
**config,
)
if random_node_split is not None:
dataset.random_node_split(**random_node_split)
logger.info(f"-- [Server {server_rank}] Initializing server ...")

torch.multiprocessing.spawn(
fn=run_server_proc, args=(handle, config, server_rank, dataset), nprocs=1
)


if __name__ == "__main__":
if len(sys.argv) < 3:
logger.info(
"Usage: ./launch_graphlearn_torch.py <handle> <config> <server_index>",
file=sys.stderr,
)
sys.exit(-1)

handle = decode_arg(sys.argv[1])
config = decode_arg(sys.argv[2])
server_index = int(sys.argv[3])

logger.info(
f"launch_graphlearn_torch_server handle: {handle} config: {config} server_index: {server_index}"
)
launch_graphlearn_torch_server(handle, config, server_index)
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ def create_interactive_instance(
pass

@abstractmethod
def create_learning_instance(self, object_id: int, handle: str, config: str):
def create_learning_instance(
self, object_id: int, handle: str, config: str, learning_backend: int
):
pass

@abstractmethod
Expand Down
75 changes: 72 additions & 3 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from graphscope.framework.utils import get_java_version
from graphscope.framework.utils import get_tempdir
from graphscope.framework.utils import is_free_port
from graphscope.proto import message_pb2
from graphscope.proto import types_pb2

from gscoordinator.launcher import AbstractLauncher
Expand Down Expand Up @@ -245,7 +246,19 @@ def _popen_helper(cmd, cwd, env, stdout=None, stderr=None):
)
return process

def create_learning_instance(self, object_id, handle, config):
def create_learning_instance(self, object_id, handle, config, learning_backend):
if learning_backend == message_pb2.LearningBackend.GRAPHLEARN:
return self._create_graphlearn_instance(
object_id=object_id, handle=handle, config=config
)
elif learning_backend == message_pb2.LearningBackend.GRAPHLEARN_TORCH:
return self._create_graphlearn_torch_instance(
object_id=object_id, handle=handle, config=config
)
else:
raise ValueError("invalid learning backend")

def _create_graphlearn_instance(self, object_id, handle, config):
# prepare argument
handle = json.loads(
base64.b64decode(handle.encode("utf-8", errors="ignore")).decode(
Expand Down Expand Up @@ -275,12 +288,12 @@ def create_learning_instance(self, object_id, handle, config):
cmd = [
sys.executable,
"-m",
"gscoordinator.learning",
"gscoordinator.launch_graphlearn",
handle,
config,
str(index),
]
logger.debug("launching learning server: %s", " ".join(cmd))
logger.debug("launching graphlearn server: %s", " ".join(cmd))

proc = self._popen_helper(cmd, cwd=None, env=env)
stdout_watcher = PipeWatcher(proc.stdout, sys.stdout)
Expand All @@ -289,6 +302,62 @@ def create_learning_instance(self, object_id, handle, config):
self._learning_instance_processes[object_id].append(proc)
return server_list

def _create_graphlearn_torch_instance(self, object_id, handle, config):
import pickle

handle = pickle.loads(base64.b64decode(handle))

server_client_master_port = get_free_port("localhost")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get ports for train, test and val

handle["server_client_master_port"] = server_client_master_port

server_list = [f"localhost:{server_client_master_port}"]
# for train, val and test
for _ in range(3):
server_list.append("localhost:" + str(get_free_port("localhost")))

handle = base64.b64encode(pickle.dumps(handle))

# launch the server
env = os.environ.copy()
# set coordinator dir to PYTHONPATH
python_path = (
env.get("PYTHONPATH", "")
+ os.pathsep
+ os.path.dirname(os.path.dirname(__file__))
)
env["PYTHONPATH"] = python_path

self._learning_instance_processes[object_id] = []
for index in range(self._num_workers):
cmd = [
sys.executable,
"-m",
"gscoordinator.launch_graphlearn_torch",
handle,
config,
str(index),
]
logger.debug("launching graphlearn_torch server: %s", " ".join(str(cmd)))

proc = subprocess.Popen(
cmd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
errors="replace",
universal_newlines=True,
bufsize=1,
)
stdout_watcher = PipeWatcher(
proc.stdout,
sys.stdout,
suppressed=(not logger.isEnabledFor(logging.DEBUG)),
)
setattr(proc, "stdout_watcher", stdout_watcher)
self._learning_instance_processes[object_id].append(proc)
return server_list

def close_analytical_instance(self):
self._stop_subprocess(self._analytical_engine_process, kill=True)
self._analytical_engine_endpoint = None
Expand Down
Loading