Skip to content

Commit

Permalink
Allow pass params to gie instance (#2885)
Browse files Browse the repository at this point in the history
Allow passing a dict to customize the configuration of GIE instance.

```python
interactive = graphscope.gremlin(graph, params={'pegasus.timeout': 600000})
```

The key-value pair would be append to the `vineyard.frontend.properties`
and `vineyard.executor.properties`.
  • Loading branch information
siyuan0322 authored Jun 14, 2023
1 parent 17819c8 commit e4f1f23
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 16 deletions.
5 changes: 4 additions & 1 deletion coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,11 @@ def _match_frontend_endpoint(pattern, lines):
# create instance
object_id = request.object_id
schema_path = request.schema_path
params = request.params
try:
proc = self._launcher.create_interactive_instance(object_id, schema_path)
proc = self._launcher.create_interactive_instance(
object_id, schema_path, params
)
gie_manager = InteractiveQueryManager(object_id)
# Put it to object_manager to ensure it could be killed during coordinator cleanup
# If coordinator is shutdown by force when creating interactive instance
Expand Down
18 changes: 15 additions & 3 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,7 +605,12 @@ def _allocate_interactive_engine(self, object_id):
return self.deploy_interactive_engine(object_id)

def _distribute_interactive_process(
self, hosts, object_id: int, schema_path: str, engine_selector: str
self,
hosts,
object_id: int,
schema_path: str,
params: dict,
engine_selector: str,
):
"""
Args:
Expand All @@ -617,6 +622,10 @@ def _distribute_interactive_process(
env = os.environ.copy()
env["GRAPHSCOPE_HOME"] = GRAPHSCOPE_HOME
container = self._engine_cluster.interactive_executor_container_name

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_k8s",
Expand All @@ -630,6 +639,7 @@ def _distribute_interactive_process(
str(self._interactive_port + 2), # frontend port
self._coordinator_name,
engine_selector,
params,
]
self._interactive_port += 3
logger.info("Create GIE instance with command: %s", " ".join(cmd))
Expand All @@ -648,7 +658,9 @@ def _distribute_interactive_process(
)
return process

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pod_name_list, _, _ = self._allocate_interactive_engine(object_id)
if not pod_name_list:
raise RuntimeError("Failed to allocate interactive engine")
Expand All @@ -661,7 +673,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
)

return self._distribute_interactive_process(
hosts, object_id, schema_path, engine_selector
hosts, object_id, schema_path, params, engine_selector
)

def close_interactive_instance(self, object_id):
Expand Down
4 changes: 3 additions & 1 deletion coordinator/gscoordinator/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ def create_analytical_instance(self):
pass

@abstractmethod
def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
pass

@abstractmethod
Expand Down
8 changes: 7 additions & 1 deletion coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,9 @@ def create_analytical_instance(self):
"Analytical engine is listening on %s", self._analytical_engine_endpoint
)

def create_interactive_instance(self, object_id: int, schema_path: str):
def create_interactive_instance(
self, object_id: int, schema_path: str, params: dict
):
try:
logger.info("Java version: %s", get_java_version())
except: # noqa: E722
Expand All @@ -218,6 +220,9 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
else:
num_workers = self._num_workers

params = "\n".join([f"{k}={v}" for k, v in params.items()])
params = base64.b64encode(params.encode("utf-8")).decode("utf-8")

cmd = [
INTERACTIVE_ENGINE_SCRIPT,
"create_gremlin_instance_on_local",
Expand All @@ -229,6 +234,7 @@ def create_interactive_instance(self, object_id: int, schema_path: str):
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2 * num_workers), # frontend port
self.vineyard_socket,
params,
]
logger.info("Create GIE instance with command: %s", " ".join(cmd))
self._interactive_port += 3
Expand Down
9 changes: 9 additions & 0 deletions docs/interactive_engine/getting_started.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ You may see something like:
The number 6 is printed, which is the number of vertices in modern graph.
### Customize Configurations for GIE instance
You could pass additional key-value pairs to customize the startup configuration of GIE, for example:
```python
# Set the timeout value to 10 min
g = gs.gremlin(graph, params={'pegasus.timeout': 600000})
```
## What's the Next
As shown in the above example, it is very easy to use GraphScope to interactively query a graph using the gremlin query language on your local machine. You may find more tutorials [here](https://tinkerpop.apache.org/docs/current/tutorials/getting-started/) for the basic Gremlin usage, in which most read-only queries can be seamlessly executed with the above `g.execute()` function.
Expand Down
25 changes: 21 additions & 4 deletions interactive_engine/assembly/src/bin/graphscope/giectl
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,23 @@ start_frontend() {
declare -r schema_path=$3
declare -r pegasus_hosts=$4
declare -r frontend_port=$5
declare -r params=$6

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

# create related directories
declare -r log_dir=${GS_LOG}/${object_id}
declare -r config_dir=${GRAPHSCOPE_RUNTIME}/config/${object_id}
declare -r pid_dir=${GRAPHSCOPE_RUNTIME}/pid/${object_id}

mkdir -p ${log_dir} ${config_dir} ${pid_dir}

# make a "current" link
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

declare java_opt="-server
-verbose:gc
-Xloggc:${log_dir}/frontend.gc.log
Expand Down Expand Up @@ -113,6 +118,8 @@ start_frontend() {
-e "s@FRONTEND_SERVICE_PORT@${frontend_port}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/frontend.vineyard.properties > ${config_dir}/frontend.vineyard.properties
echo -e "\n" >> ${config_dir}/frontend.vineyard.properties
echo $decoded_params >> ${config_dir}/frontend.vineyard.properties

# frontend service hold a handle client of coordinator
java ${java_opt} \
Expand Down Expand Up @@ -142,6 +149,8 @@ start_executor() {
declare -r server_size=$4
declare -r rpc_port=$5
declare -r network_servers=$6
declare -r params=$7

declare -r threads_per_worker=${THREADS_PER_WORKER:-2}

declare -r log_dir=${GS_LOG}/${object_id}
Expand All @@ -157,6 +166,8 @@ start_executor() {
unlink ${GS_LOG}/current || true
ln -s ${log_dir} ${GS_LOG}/current

decoded_params=`echo -n $params | base64 -d`

# set executor config file
sed -e "s@GRAPH_NAME@${object_id}@g" \
-e "s@VINEYARD_OBJECT_ID@${object_id}@g" \
Expand All @@ -166,6 +177,8 @@ start_executor() {
-e "s@NETWORK_SERVERS@${network_servers}@g" \
-e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \
${GRAPHSCOPE_HOME}/conf/executor.vineyard.properties > ${config_dir}/executor.$server_id.vineyard.properties
echo -e "\n" >> ${config_dir}/executor.$server_id.vineyard.properties
echo $decoded_params >> ${config_dir}/executor.$server_id.vineyard.properties

cp ${GRAPHSCOPE_HOME}/conf/log4rs.yml ${config_dir}/log4rs.yml

Expand Down Expand Up @@ -201,6 +214,7 @@ create_gremlin_instance_on_local() {
declare -r executor_rpc_port=$6
declare -r frontend_port=$7
export VINEYARD_IPC_SOCKET=$8
declare -r params=$9

declare -r cluster_type="local"
declare -r executor_count="1" # local mode only start one executor
Expand Down Expand Up @@ -228,7 +242,7 @@ create_gremlin_instance_on_local() {
pegasus_hosts=${pegasus_hosts:1}

start_frontend ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} \
${frontend_port}
${frontend_port} ${params}

log "FRONTEND_ENDPOINT:127.0.0.1:${frontend_port}"

Expand All @@ -237,7 +251,7 @@ create_gremlin_instance_on_local() {
current_executor_port=$(($executor_port + 2 * $server_id))
current_executor_rpc_port=$(($executor_rpc_port + 2 * $server_id))
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${server_id} ${server_size} ${current_executor_rpc_port} \
${network_servers}
${network_servers} ${params}
done
}

Expand Down Expand Up @@ -271,6 +285,7 @@ create_gremlin_instance_on_k8s() {
declare -r frontend_port=$8
declare -r coordinator_name=$9 # deployment name of coordinator
declare -r engine_selector=${10}
declare -r params=${11}

instance_id=${coordinator_name#*-}

Expand All @@ -289,7 +304,7 @@ create_gremlin_instance_on_k8s() {

launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \
${GRAPHSCOPE_HOME}/bin/giectl start_frontend \
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port}"
${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port} '${params}'"
kubectl cp ${schema_path} ${frontend_name}:${schema_path}

kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}"
Expand All @@ -303,7 +318,9 @@ create_gremlin_instance_on_k8s() {
_server_id=0
for pod in $(echo ${pod_hosts})
do
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} ${executor_rpc_port} ${network_servers}"
launch_executor_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} ${GRAPHSCOPE_HOME}/bin/giectl \
start_executor ${GRAPHSCOPE_RUNTIME} ${object_id} ${_server_id} ${server_size} \
${executor_rpc_port} ${network_servers} '${params}'"
# kubectl exec ${pod} -c ${engine_container} -- sudo mkdir -p /var/log/graphscope
# kubectl exec ${pod} -c ${engine_container} -- sudo chown -R graphscope:graphscope /var/log/graphscope
kubectl exec ${pod} -c ${engine_container} -- /bin/bash -c "${launch_executor_cmd}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ frontend.service.port = FRONTEND_SERVICE_PORT

# disable the authentication if username or password is not set
#auth.username = default
#auth.password = default
#auth.password = default
6 changes: 5 additions & 1 deletion python/graphscope/client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,14 @@ def create_analytical_instance(self):
response = self._stub.CreateAnalyticalInstance(request)
return json.loads(response.engine_config), response.host_names

def create_interactive_instance(self, object_id, schema_path):
def create_interactive_instance(self, object_id, schema_path, params=None):
request = message_pb2.CreateInteractiveInstanceRequest(
session_id=self._session_id, object_id=object_id, schema_path=schema_path
)
if params is not None:
for k, v in params.items():
request.params[str(k)] = str(v)

response = self._stub.CreateInteractiveInstance(request)
return response.gremlin_endpoint

Expand Down
11 changes: 7 additions & 4 deletions python/graphscope/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ def _run_on_local(self):
self._config_params["port"] = None
self._config_params["vineyard_socket"] = ""

def gremlin(self, graph):
def gremlin(self, graph, params=None):
"""Get an interactive engine handler to execute gremlin queries.
It will return an instance of :class:`graphscope.interactive.query.InteractiveQuery`,
Expand All @@ -1319,6 +1319,7 @@ def gremlin(self, graph):
Args:
graph (:class:`graphscope.framework.graph.GraphDAGNode`):
The graph to create interactive instance.
params: A dict consists of configurations of GIE instance.
Raises:
InvalidArgumentError:
Expand All @@ -1343,7 +1344,9 @@ def gremlin(self, graph):

object_id = graph.vineyard_id
schema_path = graph.schema_path
endpoint = self._grpc_client.create_interactive_instance(object_id, schema_path)
endpoint = self._grpc_client.create_interactive_instance(
object_id, schema_path, params
)
interactive_query = InteractiveQuery(graph, endpoint)
self._interactive_instance_dict[object_id] = interactive_query
graph._attach_interactive_instance(interactive_query)
Expand Down Expand Up @@ -1728,7 +1731,7 @@ def g(
)


def gremlin(graph):
def gremlin(graph, params=None):
"""Create an interactive engine and get the handler to execute the gremlin queries.
See params detail in :meth:`graphscope.Session.gremlin`
Expand All @@ -1749,7 +1752,7 @@ def gremlin(graph):
assert (
graph._session is not None
), "The graph object is invalid" # pylint: disable=protected-access
return graph._session.gremlin(graph) # pylint: disable=protected-access
return graph._session.gremlin(graph, params) # pylint: disable=protected-access


def graphlearn(graph, nodes=None, edges=None, gen_labels=None):
Expand Down
1 change: 1 addition & 0 deletions python/graphscope/proto/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ message CreateInteractiveInstanceRequest {
string session_id = 1;
int64 object_id = 2;
string schema_path = 3;
map<string, string> params = 4;
};

message CreateInteractiveInstanceResponse {
Expand Down

0 comments on commit e4f1f23

Please sign in to comment.