diff --git a/coordinator/gscoordinator/cluster_builder.py b/coordinator/gscoordinator/cluster_builder.py index e767564400af..5b9ed5c5389c 100644 --- a/coordinator/gscoordinator/cluster_builder.py +++ b/coordinator/gscoordinator/cluster_builder.py @@ -577,10 +577,13 @@ def get_interactive_frontend_deployment(self, replicas=1): self._namespace, name, deployment_spec, self._frontend_labels ) - def get_interactive_frontend_service(self, port): + def get_interactive_frontend_service(self, gremlin_port, cypher_port): name = self.frontend_deployment_name service_type = self._service_type - ports = [kube_client.V1ServicePort(name="gremlin", port=port)] + ports = [ + kube_client.V1ServicePort(name="gremlin", port=gremlin_port), + kube_client.V1ServicePort(name="cypher", port=cypher_port), + ] service_spec = ResourceBuilder.get_service_spec( service_type, ports, self._frontend_labels, None ) diff --git a/coordinator/gscoordinator/coordinator.py b/coordinator/gscoordinator/coordinator.py index 48935e16a07f..aa0e26abe5f9 100644 --- a/coordinator/gscoordinator/coordinator.py +++ b/coordinator/gscoordinator/coordinator.py @@ -64,7 +64,7 @@ from gscoordinator.object_manager import ObjectManager from gscoordinator.op_executor import OperationExecutor from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH -from gscoordinator.utils import check_gremlin_server_ready +from gscoordinator.utils import check_server_ready from gscoordinator.utils import create_single_op_dag from gscoordinator.utils import str2bool from gscoordinator.version import __version__ @@ -441,10 +441,16 @@ def _match_frontend_endpoint(pattern, lines): return "" # frontend endpoint pattern - FRONTEND_PATTERN = re.compile("(?<=FRONTEND_ENDPOINT:).*$") + FRONTEND_GREMLIN_PATTERN = re.compile("(?<=FRONTEND_GREMLIN_ENDPOINT:).*$") + FRONTEND_CYPHER_PATTERN = re.compile("(?<=FRONTEND_CYPHER_ENDPOINT:).*$") # frontend external endpoint, for clients that are outside of cluster to connect # only available in kubernetes mode, exposed by NodePort or LoadBalancer - FRONTEND_EXTERNAL_PATTERN = re.compile("(?<=FRONTEND_EXTERNAL_ENDPOINT:).*$") + FRONTEND_EXTERNAL_GREMLIN_PATTERN = re.compile( + "(?<=FRONTEND_EXTERNAL_GREMLIN_ENDPOINT:).*$" + ) + FRONTEND_EXTERNAL_CYPHER_PATTERN = re.compile( + "(?<=FRONTEND_EXTERNAL_CYPHER_ENDPOINT:).*$" + ) # create instance object_id = request.object_id @@ -464,13 +470,22 @@ def _match_frontend_endpoint(pattern, lines): return_code = proc.poll() if return_code != 0: raise RuntimeError(f"Error code: {return_code}, message {outs}") - # match frontend endpoint and check for ready - endpoint = _match_frontend_endpoint(FRONTEND_PATTERN, outs) + # match frontend endpoints and check for ready + gremlin_endpoint = _match_frontend_endpoint(FRONTEND_GREMLIN_PATTERN, outs) + cypher_endpoint = _match_frontend_endpoint(FRONTEND_CYPHER_PATTERN, outs) + logger.debug("Got endpoints: %s %s", gremlin_endpoint, cypher_endpoint) # coordinator use internal endpoint - gie_manager.set_endpoint(endpoint) - if check_gremlin_server_ready(endpoint): # throws TimeoutError + gie_manager.set_endpoint(gremlin_endpoint) + if check_server_ready( + gremlin_endpoint, server="gremlin" + ) and check_server_ready( + cypher_endpoint, server="cypher" + ): # throws TimeoutError logger.info( - "Built interactive frontend %s for graph %ld", endpoint, object_id + "Built interactive frontend gremlin: %s & cypher: %s for graph %ld", + gremlin_endpoint, + cypher_endpoint, + object_id, ) except Exception as e: context.set_code(grpc.StatusCode.ABORTED) @@ -480,11 +495,25 @@ def _match_frontend_endpoint(pattern, lines): self._launcher.close_interactive_instance(object_id) self._object_manager.pop(object_id) return message_pb2.CreateInteractiveInstanceResponse() - external_endpoint = _match_frontend_endpoint(FRONTEND_EXTERNAL_PATTERN, outs) + external_gremlin_endpoint = _match_frontend_endpoint( + FRONTEND_EXTERNAL_GREMLIN_PATTERN, outs + ) + external_cypher_endpoint = _match_frontend_endpoint( + FRONTEND_EXTERNAL_CYPHER_PATTERN, outs + ) + logger.debug( + "Got external endpoints: %s %s", + external_gremlin_endpoint, + external_cypher_endpoint, + ) + # client use external endpoint (k8s mode), or internal endpoint (standalone mode) - endpoint = external_endpoint or endpoint + gremlin_endpoint = external_gremlin_endpoint or gremlin_endpoint + cypher_endpoint = external_cypher_endpoint or cypher_endpoint return message_pb2.CreateInteractiveInstanceResponse( - gremlin_endpoint=endpoint, object_id=object_id + gremlin_endpoint=gremlin_endpoint, + cypher_endpoint=cypher_endpoint, + object_id=object_id, ) def CreateLearningInstance(self, request, context): diff --git a/coordinator/gscoordinator/kubernetes_launcher.py b/coordinator/gscoordinator/kubernetes_launcher.py index cdb62738bd6b..071bd801d9a6 100644 --- a/coordinator/gscoordinator/kubernetes_launcher.py +++ b/coordinator/gscoordinator/kubernetes_launcher.py @@ -636,12 +636,13 @@ def _distribute_interactive_process( container, str(self._interactive_port), # executor port str(self._interactive_port + 1), # executor rpc port - str(self._interactive_port + 2), # frontend port + str(self._interactive_port + 2), # frontend gremlin port + str(self._interactive_port + 3), # frontend cypher port self._coordinator_name, engine_selector, params, ] - self._interactive_port += 3 + self._interactive_port += 4 logger.info("Create GIE instance with command: %s", " ".join(cmd)) process = subprocess.Popen( cmd, @@ -947,7 +948,7 @@ def _create_frontend_deployment(self, name=None, owner_references=None): def _create_frontend_service(self): logger.info("Creating frontend service...") - service = self._engine_cluster.get_interactive_frontend_service(8233) + service = self._engine_cluster.get_interactive_frontend_service(8233, 7687) service.metadata.owner_references = self._owner_references response = self._core_api.create_namespaced_service(self._namespace, service) self._resource_object.append(response) diff --git a/coordinator/gscoordinator/local_launcher.py b/coordinator/gscoordinator/local_launcher.py index c08e77bf73db..1be951189890 100644 --- a/coordinator/gscoordinator/local_launcher.py +++ b/coordinator/gscoordinator/local_launcher.py @@ -232,12 +232,13 @@ def create_interactive_instance( str(num_workers), # server size str(self._interactive_port), # executor port str(self._interactive_port + 1), # executor rpc port - str(self._interactive_port + 2 * num_workers), # frontend port + str(self._interactive_port + 2 * num_workers), # frontend gremlin port + str(self._interactive_port + 2 * num_workers + 1), # frontend cypher port self.vineyard_socket, params, ] logger.info("Create GIE instance with command: %s", " ".join(cmd)) - self._interactive_port += 3 + self._interactive_port += 2 * num_workers + 2 process = subprocess.Popen( cmd, start_new_session=True, diff --git a/coordinator/gscoordinator/utils.py b/coordinator/gscoordinator/utils.py index 100936fd0306..d20cfe611570 100644 --- a/coordinator/gscoordinator/utils.py +++ b/coordinator/gscoordinator/utils.py @@ -1996,8 +1996,8 @@ def check_argument(condition, message=None): raise ValueError(f"Check failed: {message}") -def check_gremlin_server_ready(endpoint): - def _check_task(endpoint): +def check_server_ready(endpoint, server="gremlin"): + def _check_gremlin_task(endpoint): from gremlin_python.driver.client import Client if "MY_POD_NAME" in os.environ: @@ -2005,12 +2005,14 @@ def _check_task(endpoint): if endpoint == "localhost" or endpoint == "127.0.0.1": # now, used in macOS with docker-desktop kubernetes cluster, # which external ip is 'localhost' when service type is 'LoadBalancer' + logger.info("In kubernetes env, gremlin server is ready.") return True try: client = Client(f"ws://{endpoint}/gremlin", "g") # May throw client.submit("g.V().limit(1)").all().result() + logger.info("Gremlin server is ready.") finally: try: client.close() @@ -2018,11 +2020,40 @@ def _check_task(endpoint): pass return True + def _check_cypher_task(endpoint): + from neo4j import GraphDatabase + + if "MY_POD_NAME" in os.environ: + # inner kubernetes env + if endpoint == "localhost" or endpoint == "127.0.0.1": + logger.info("In kubernetes env, cypher server is ready.") + return True + + try: + logger.debug("Try to connect to cypher server.") + driver = GraphDatabase.driver(f"neo4j://{endpoint}", auth=("", "")) + # May throw + driver.verify_connectivity() + logger.info("Checked connectivity to cypher server.") + finally: + try: + driver.close() + except: # noqa: E722 + pass + return True + executor = ThreadPoolExecutor(max_workers=20) begin_time = time.time() while True: - t = executor.submit(_check_task, endpoint) + if server == "gremlin": + t = executor.submit(_check_gremlin_task, endpoint) + elif server == "cypher": + t = executor.submit(_check_cypher_task, endpoint) + else: + raise ValueError( + f"Unsupported server type: {server} other than 'gremlin' or 'cypher'" + ) try: _ = t.result(timeout=30) except Exception as e: @@ -2034,4 +2065,6 @@ def _check_task(endpoint): time.sleep(3) if time.time() - begin_time > INTERACTIVE_INSTANCE_TIMEOUT_SECONDS: executor.shutdown(wait=False) - raise TimeoutError(f"Gremlin check query failed: {error_message}") + raise TimeoutError( + f"{server.capitalize()} check query failed: {error_message}" + ) diff --git a/interactive_engine/assembly/src/bin/graphscope/giectl b/interactive_engine/assembly/src/bin/graphscope/giectl index a5343be2329d..1d45b8df9909 100755 --- a/interactive_engine/assembly/src/bin/graphscope/giectl +++ b/interactive_engine/assembly/src/bin/graphscope/giectl @@ -64,15 +64,17 @@ END # schema_path: path of graph schema file # pegasus_hosts: hosts and port of executor # executor_count: number of executor, equal to engine count -# frontend_port: frontend port +# frontend_gremlin_port: frontend gremlin port +# frontend_cypher_port: frontend cypher port ########################## start_frontend() { declare -r GRAPHSCOPE_RUNTIME=$1 declare -r object_id=$2 declare -r schema_path=$3 declare -r pegasus_hosts=$4 - declare -r frontend_port=$5 - declare -r params=$6 + declare -r frontend_gremlin_port=$5 + declare -r frontend_cypher_port=$6 + declare -r params=$7 declare -r threads_per_worker=${THREADS_PER_WORKER:-2} @@ -115,7 +117,8 @@ start_frontend() { sed -e "s@GRAPH_NAME@${object_id}@g" \ -e "s@GRAPH_SCHEMA@${schema_path}@g" \ -e "s@PEGASUS_HOSTS@${pegasus_hosts}@g" \ - -e "s@FRONTEND_SERVICE_PORT@${frontend_port}@g" \ + -e "s@FRONTEND_GREMLIN_PORT@${frontend_gremlin_port}@g" \ + -e "s@FRONTEND_CYPHER_PORT@${frontend_cypher_port}@g" \ -e "s@THREADS_PER_WORKER@${threads_per_worker}@g" \ ${GRAPHSCOPE_HOME}/conf/frontend.vineyard.properties > ${config_dir}/frontend.vineyard.properties echo -e "\n" >> ${config_dir}/frontend.vineyard.properties @@ -202,7 +205,8 @@ start_executor() { # server_id: global server id of executor # executor_port # executor_rpc_port -# frontend_port +# frontend_gremlin_port +# frontend_cypher_port # vineyard_ipc_socket ########################## create_gremlin_instance_on_local() { @@ -212,9 +216,10 @@ create_gremlin_instance_on_local() { declare -r server_size=$4 declare -r executor_port=$5 declare -r executor_rpc_port=$6 - declare -r frontend_port=$7 - export VINEYARD_IPC_SOCKET=$8 - declare -r params=$9 + declare -r frontend_gremlin_port=$7 + declare -r frontend_cypher_port=$8 + export VINEYARD_IPC_SOCKET=$9 + declare -r params=${10} declare -r cluster_type="local" declare -r executor_count="1" # local mode only start one executor @@ -242,9 +247,10 @@ create_gremlin_instance_on_local() { pegasus_hosts=${pegasus_hosts:1} start_frontend ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} \ - ${frontend_port} ${params} + ${frontend_gremlin_port} ${frontend_cypher_port} ${params} - log "FRONTEND_ENDPOINT:127.0.0.1:${frontend_port}" + log "FRONTEND_GREMLIN_ENDPOINT:127.0.0.1:${frontend_gremlin_port}" + log "FRONTEND_CYPHER_ENDPOINT:127.0.0.1:${frontend_cypher_port}" # executor use executor inner port for server_id in $(seq 0 $(($server_size - 1))); do @@ -255,6 +261,84 @@ create_gremlin_instance_on_local() { done } +########################## +# create k8s service +# Globals: +# GREMLIN_EXPOSE +# Arguments: +# lang: gremlin or cypher +# object_id: id of vineyard object +# instance_id: id of instance +# frontend_deployment_name: name of frontend deployment +# frontend_port: port of frontend +# frontend_external_port: external port of frontend +########################## +create_k8s_service() { + declare -r lang=$1 + declare -r object_id=$2 + declare -r instance_id=$3 + declare -r frontend_deployment_name=$4 + declare -r frontend_port=$5 + declare frontend_external_port=$6 + + if [ "${GREMLIN_EXPOSE}" = "LoadBalancer" ]; then + kubectl expose deployment ${frontend_deployment_name} --name=${lang}-${object_id} --port=${frontend_external_port} \ + --target-port=${frontend_port} --type=LoadBalancer 1>/dev/null 2>&1 + [ $? -eq 0 ] || exit 1 + wait_period_seconds=0 + while true + do + external_ip=$(kubectl get service ${lang}-${object_id} -ojsonpath='{.status.loadBalancer.ingress[0].ip}') || true + if [ -n "${external_ip}" ]; then + break + fi + wait_period_seconds=$(($wait_period_seconds+5)) + if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then + echo "Get ${lang} external ip of ${GREMLIN_EXPOSE} failed." + break + fi + sleep 5 + done + else + # NodePort service type + kubectl expose deployment ${frontend_deployment_name} --name=${lang}-${object_id} --port=${frontend_external_port} \ + --target-port=${frontend_port} --type=NodePort 1>/dev/null 2>&1 + [ $? -eq 0 ] || exit 1 + wait_period_seconds=0 + while true + do + frontend_external_port=$(kubectl describe services ${lang}-${object_id} | grep "NodePort" | grep "TCP" | tr -cd "0-9") + if [ -n "${frontend_external_port}" ]; then + break + fi + wait_period_seconds=$(($wait_period_seconds+5)) + if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then + log "Get interactive engine frontend node port failed." + break + fi + sleep 5 + done + wait_period_seconds=0 + while true + do + external_ip=$(kubectl describe pods ${frontend_deployment_name} | grep "Node:" | head -1 | awk -F '[ /]+' '{print $3}') + if [ -n "${external_ip}" ]; then + break + fi + wait_period_seconds=$(($wait_period_seconds+5)) + if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then + log "Get interactive engine frontend ${lang} host ip of ${GREMLIN_EXPOSE} failed." + break + fi + sleep 5 + done + fi + # currently support only 1 pod. + frontend_ip=$(kubectl get pod -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id} -o jsonpath='{.items[*].status.podIP}') + log "FRONTEND_${lang^^}_ENDPOINT:${frontend_ip}:${frontend_port}" + log "FRONTEND_EXTERNAL_${lang^^}_ENDPOINT:${external_ip}:${frontend_external_port}" +} + ########################## # create interactive engine instance on k8s. # Globals: @@ -267,7 +351,8 @@ create_gremlin_instance_on_local() { # engine_container: container name of engine # executor_port # executor_rpc_port -# frontend_port +# frontend_gremlin_port +# frontend_cypher_port # coordinator_name: name of coordinator deployment object in k8s # engine_selector: the label name of engine selector ########################## @@ -282,10 +367,11 @@ create_gremlin_instance_on_k8s() { declare -r engine_container=$5 declare -r executor_port=$6 declare -r executor_rpc_port=$7 - declare -r frontend_port=$8 - declare -r coordinator_name=$9 # deployment name of coordinator - declare -r engine_selector=${10} - declare -r params=${11} + declare -r frontend_gremlin_port=$8 + declare -r frontend_cypher_port=$9 + declare -r coordinator_name=${10} # deployment name of coordinator + declare -r engine_selector=${11} + declare -r params=${12} instance_id=${coordinator_name#*-} @@ -304,9 +390,12 @@ create_gremlin_instance_on_k8s() { launch_frontend_cmd="GRAPHSCOPE_HOME=${GRAPHSCOPE_HOME} \ ${GRAPHSCOPE_HOME}/bin/giectl start_frontend \ - ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_port} '${params}'" + ${GRAPHSCOPE_RUNTIME} ${object_id} ${schema_path} ${pegasus_hosts} ${frontend_gremlin_port} ${frontend_cypher_port} '${params}'" kubectl cp ${schema_path} ${frontend_name}:${schema_path} + log "${frontend_name}" + log "${launch_frontend_cmd}" + kubectl exec ${frontend_name} -- /bin/bash -c "${launch_frontend_cmd}" network_servers="" @@ -330,65 +419,13 @@ create_gremlin_instance_on_k8s() { log "Expose gremlin server." timeout_seconds=60 # random from range [50001, 51000) for interactive engine - frontend_external_port=$(( ((RANDOM<<15)|RANDOM) % 1000 + 50000 )) + frontend_external_gremlin_port=$(( ((RANDOM<<15)|RANDOM) % 1000 + 50000 )) + frontend_external_cypher_port=$(( ((RANDOM<<15)|RANDOM) % 1000 + 50000 )) frontend_deployment_name=$(kubectl get deployment -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id},app.kubernetes.io/engine_selector=${engine_selector} -o jsonpath='{.items[*].metadata.name}') - if [ "${GREMLIN_EXPOSE}" = "LoadBalancer" ]; then - kubectl expose deployment ${frontend_deployment_name} --name=gremlin-${object_id} --port=${frontend_external_port} \ - --target-port=${frontend_port} --type=LoadBalancer 1>/dev/null 2>&1 - [ $? -eq 0 ] || exit 1 - wait_period_seconds=0 - while true - do - external_ip=$(kubectl get service gremlin-${object_id} -ojsonpath='{.status.loadBalancer.ingress[0].ip}') || true - if [ -n "${external_ip}" ]; then - break - fi - wait_period_seconds=$(($wait_period_seconds+5)) - if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then - echo "Get external ip of ${GREMLIN_EXPOSE} failed." - break - fi - sleep 5 - done - else - # NodePort service type - # expose gremlin service - kubectl expose deployment ${frontend_deployment_name} --name=gremlin-${object_id} --port=${frontend_external_port} \ - --target-port=${frontend_port} --type=NodePort 1>/dev/null 2>&1 - [ $? -eq 0 ] || exit 1 - wait_period_seconds=0 - while true - do - frontend_external_port=$(kubectl describe services gremlin-${object_id} | grep "NodePort" | grep "TCP" | tr -cd "0-9") - if [ -n "${frontend_external_port}" ]; then - break - fi - wait_period_seconds=$(($wait_period_seconds+5)) - if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then - log "Get interactive engine frontend node port failed." - break - fi - sleep 5 - done - wait_period_seconds=0 - while true - do - external_ip=$(kubectl describe pods ${frontend_deployment_name} | grep "Node:" | head -1 | awk -F '[ /]+' '{print $3}') - if [ -n "${external_ip}" ]; then - break - fi - wait_period_seconds=$(($wait_period_seconds+5)) - if [ ${wait_period_seconds} -gt ${timeout_seconds} ];then - log "Get interactive engine frontend host ip of ${GREMLIN_EXPOSE} failed." - break - fi - sleep 5 - done - fi - # currently support only 1 pod. - frontend_ip=$(kubectl get pod -lapp.kubernetes.io/component=frontend,app.kubernetes.io/instance=${instance_id} -o jsonpath='{.items[*].status.podIP}') - log "FRONTEND_ENDPOINT:${frontend_ip}:${frontend_port}" - log "FRONTEND_EXTERNAL_ENDPOINT:${external_ip}:${frontend_external_port}" + + create_k8s_service gremlin ${object_id} ${instance_id} ${frontend_deployment_name} ${frontend_gremlin_port} ${frontend_external_gremlin_port} + create_k8s_service cypher ${object_id} ${instance_id} ${frontend_deployment_name} ${frontend_cypher_port} ${frontend_external_cypher_port} + exit 0 } @@ -441,6 +478,8 @@ close_gremlin_instance_on_k8s() { # delete service log "delete gremlin service" kubectl delete service gremlin-${object_id} || true + log "delete cypher service" + kubectl delete service cypher-${object_id} || true # kill frontend and coordinator process log "Close frontend process." diff --git a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties index c99f266db329..62cc062d741a 100644 --- a/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties +++ b/interactive_engine/assembly/src/conf/graphscope/frontend.vineyard.properties @@ -14,8 +14,9 @@ pegasus.hosts = PEGASUS_HOSTS graph.schema = GRAPH_SCHEMA ## Frontend Config -gremlin.server.port = FRONTEND_SERVICE_PORT +gremlin.server.port = FRONTEND_GREMLIN_PORT +neo4j.bolt.server.port = FRONTEND_CYPHER_PORT # disable the authentication if username or password is not set #auth.username = default #auth.password = default diff --git a/interactive_engine/compiler/src/main/resources/conf/neo4j.conf b/interactive_engine/compiler/src/main/resources/conf/neo4j.conf index d6ed7ba14ad9..3f441aa31f18 100644 --- a/interactive_engine/compiler/src/main/resources/conf/neo4j.conf +++ b/interactive_engine/compiler/src/main/resources/conf/neo4j.conf @@ -68,7 +68,7 @@ dbms.tx_state.memory_allocation=ON_HEAP # With default configuration Neo4j only accepts local connections. # To accept non-local connections, uncomment this line: -#dbms.default_listen_address=0.0.0.0 +dbms.default_listen_address=0.0.0.0 # You can also choose a specific network interface, and configure a non-default # port for each connector, by setting their individual listen_address. diff --git a/k8s/dockerfiles/coordinator.Dockerfile b/k8s/dockerfiles/coordinator.Dockerfile index ba638ba2adcd..6258c1e5094f 100644 --- a/k8s/dockerfiles/coordinator.Dockerfile +++ b/k8s/dockerfiles/coordinator.Dockerfile @@ -51,6 +51,9 @@ COPY ./interactive_engine/assembly/src/bin/graphscope/giectl /opt/graphscope/bin COPY ./k8s/utils/kube_ssh /usr/local/bin/kube_ssh RUN sudo chmod a+wrx /tmp +#to make sure neo4j==5.10.0 can be installed +RUN pip3 install pip==20.3.4 + USER graphscope WORKDIR /home/graphscope diff --git a/k8s/dockerfiles/learning.Dockerfile b/k8s/dockerfiles/learning.Dockerfile index 73178524ced8..7edbffa86e80 100644 --- a/k8s/dockerfiles/learning.Dockerfile +++ b/k8s/dockerfiles/learning.Dockerfile @@ -38,6 +38,9 @@ RUN sudo apt-get update -y && \ RUN sudo chmod a+wrx /tmp +#to make sure neo4j==5.10.0 can be installed +RUN pip3 install pip==20.3.4 + COPY --from=builder /home/graphscope/install /opt/graphscope/ RUN python3 -m pip install --no-cache-dir /opt/graphscope/*.whl && sudo rm -rf /opt/graphscope/*.whl diff --git a/learning_engine/graph-learn b/learning_engine/graph-learn index 4481923fc4d1..8267d739d266 160000 --- a/learning_engine/graph-learn +++ b/learning_engine/graph-learn @@ -1 +1 @@ -Subproject commit 4481923fc4d1a54ec82112797c573f7add46c539 +Subproject commit 8267d739d266118a91c94bae34c4a429d92fa4d8 diff --git a/python/graphscope/__init__.py b/python/graphscope/__init__.py index 13a1ce379a98..5d7bf6d71d39 100644 --- a/python/graphscope/__init__.py +++ b/python/graphscope/__init__.py @@ -44,6 +44,7 @@ from graphscope.client.session import graphlearn from graphscope.client.session import gremlin from graphscope.client.session import has_default_session +from graphscope.client.session import interactive from graphscope.client.session import session from graphscope.client.session import set_option from graphscope.framework.errors import * diff --git a/python/graphscope/client/rpc.py b/python/graphscope/client/rpc.py index ed259659c0a8..fcf29a4bc50e 100644 --- a/python/graphscope/client/rpc.py +++ b/python/graphscope/client/rpc.py @@ -214,7 +214,7 @@ def create_interactive_instance(self, object_id, schema_path, params=None): request.params[str(k)] = str(v) response = self._stub.CreateInteractiveInstance(request) - return response.gremlin_endpoint + return response.gremlin_endpoint, response.cypher_endpoint def create_learning_instance(self, object_id, handle, config): request = message_pb2.CreateLearningInstanceRequest(session_id=self._session_id) diff --git a/python/graphscope/client/session.py b/python/graphscope/client/session.py index a5fc91aac705..10c8efc0cadd 100755 --- a/python/graphscope/client/session.py +++ b/python/graphscope/client/session.py @@ -1294,16 +1294,23 @@ def _run_on_local(self): self._config_params["vineyard_socket"] = "" def gremlin(self, graph, params=None): - """Get an interactive engine handler to execute gremlin queries. + """This method is going to be deprecated. + Use :meth:`interactive` to get an interactive engine handler supports + both gremlin and cypher queries + """ + return self.interactive(graph, params) + + def interactive(self, graph, params=None): + """Get an interactive engine handler to execute gremlin and cypher queries. It will return an instance of :class:`graphscope.interactive.query.InteractiveQuery`, .. code:: python >>> # close and recreate InteractiveQuery. - >>> interactive_query = sess.gremlin(g) + >>> interactive_query = sess.interactive(g) >>> interactive_query.close() - >>> interactive_query = sess.gremlin(g) + >>> interactive_query = sess.interactive(g) Args: graph (:class:`graphscope.framework.graph.GraphDAGNode`): @@ -1316,7 +1323,7 @@ def gremlin(self, graph, params=None): Returns: :class:`graphscope.interactive.query.InteractiveQuery`: - InteractiveQuery to execute gremlin queries. + InteractiveQuery to execute gremlin and cypher queries. """ if self._session_id != graph.session_id: raise RuntimeError( @@ -1333,10 +1340,13 @@ def gremlin(self, graph, params=None): object_id = graph.vineyard_id schema_path = graph.schema_path - endpoint = self._grpc_client.create_interactive_instance( + ( + gremlin_endpoint, + cypher_endpoint, + ) = self._grpc_client.create_interactive_instance( object_id, schema_path, params ) - interactive_query = InteractiveQuery(graph, endpoint) + interactive_query = InteractiveQuery(graph, gremlin_endpoint, cypher_endpoint) self._interactive_instance_dict[object_id] = interactive_query graph._attach_interactive_instance(interactive_query) return interactive_query @@ -1721,13 +1731,20 @@ def g( def gremlin(graph, params=None): - """Create an interactive engine and get the handler to execute the gremlin queries. + """This method is going to be deprecated in the future. + Use :meth:`graphscope.interactive` instead. + """ + return interactive(graph, params) + + +def interactive(graph, params=None): + """Create an interactive engine and get the handler to execute gremlin and cypher queries. - See params detail in :meth:`graphscope.Session.gremlin` + See params detail in :meth:`graphscope.Session.interactive` Returns: :class:`graphscope.interactive.query.InteractiveQueryDAGNode`: - InteractiveQuery to execute gremlin queries, evaluated in eager mode. + InteractiveQuery to execute gremlin and cypher queries, evaluated in eager mode. Examples: @@ -1735,13 +1752,13 @@ def gremlin(graph, params=None): >>> import graphscope >>> g = graphscope.g() - >>> interactive_query = graphscope.gremlin() + >>> interactive_query = graphscope.interactive() """ assert graph is not None, "graph cannot be None" assert ( graph._session is not None ), "The graph object is invalid" # pylint: disable=protected-access - return graph._session.gremlin(graph, params) # pylint: disable=protected-access + return graph._session.interactive(graph, params) # pylint: disable=protected-access def graphlearn(graph, nodes=None, edges=None, gen_labels=None): diff --git a/python/graphscope/interactive/query.py b/python/graphscope/interactive/query.py index b6d01c3b3acb..9ff440be2f81 100644 --- a/python/graphscope/interactive/query.py +++ b/python/graphscope/interactive/query.py @@ -44,7 +44,7 @@ class InteractiveQuery(object): `Gremlin-Python `_, which implements Gremlin within the Python language. It also can expose gremlin endpoint which can be used by - any other standard gremlin console, with the method `graph_url()`. + any other standard gremlin console, with the method `gremlin_url()`. It also has a method called `subgraph` which can extract some fragments from origin graph, produce a new, smaller but concise graph stored in vineyard, @@ -54,22 +54,45 @@ class InteractiveQuery(object): to get a `GraphTraversalSource` for further traversal. """ - def __init__(self, graph, frontend_endpoint): + def __init__(self, graph, frontend_gremlin_endpoint, frontend_cypher_endpoint): """Construct a :class:`InteractiveQuery` object.""" # graph object id stored in vineyard self._graph = graph self._session = graph._session - frontend_endpoint = frontend_endpoint.split(",") - self._graph_url = [f"ws://{endpoint}/gremlin" for endpoint in frontend_endpoint] + frontend_gremlin_endpoint = frontend_gremlin_endpoint.split(",") + self._gremlin_url = [ + f"ws://{endpoint}/gremlin" for endpoint in frontend_gremlin_endpoint + ] + frontend_cypher_endpoint = frontend_cypher_endpoint.split(",") + self._cypher_url = [ + f"neo4j://{endpoint}" for endpoint in frontend_cypher_endpoint + ] self._conn = None self._gremlin_client = None + self._cypher_driver = None self.closed = False + @property + def _graph_url(self): + """This will be deprecated in the future, use `_gremlin_url` instead.""" + return self._gremlin_url + @property def graph_url(self): + """This will be deprecated in the future, use `gremlin_url` instead.""" + return self._gremlin_url + + @property + def gremlin_url(self): """The gremlin graph url can be used with any standard gremlin console, e.g., tinkerpop.""" - return self._graph_url + return self._gremlin_url + + @property + def cypher_url(self): + """The cypher graph url can be used with any standard cypher console, + e.g., neo4j.""" + return self._cypher_url @property def object_id(self): @@ -83,13 +106,14 @@ def session(self): def session_id(self): return self._session.session_id - def execute(self, query, request_options=None): + def execute(self, query, lang="gremlin", request_options=None, **kwargs): """A simple wrapper around `submit`, for compatibility""" - return self.submit(query, request_options=request_options) + return self.submit(query, lang, request_options=request_options, **kwargs) - def submit(self, query, request_options=None): - """Execute gremlin querying scripts. + def submit(self, query, lang="gremlin", request_options=None, **kwargs): + """Execute gremlin or cypher querying scripts. + Args: query (str): Scripts that written in gremlin query language. request_options (dict, optional): Gremlin request options. format: @@ -99,17 +123,45 @@ def submit(self, query, request_options=None): Returns: :class:`gremlin_python.driver.client.ResultSet`: + + + Args: + query (str): Scripts that written in cypher query language. + kwargs (dict, optional): Cypher request options. e.g.: + routing_ = RoutingControl.READ + + Returns: + :class:`neo4j.work.result.Result`: """ - return self.gremlin_client.submit(query, request_options=request_options) + if lang == "gremlin": + return self.gremlin_client.submit(query, request_options=request_options) + elif lang == "cypher": + return self.cypher_driver.execute_query(query, **kwargs) + else: + raise ValueError( + f"Unsupported query language: {lang} other than gremlin and cypher" + ) @property def gremlin_client(self): if self._gremlin_client is None: - self._gremlin_client = Client(self._graph_url[0], "g") + self._gremlin_client = Client(self._gremlin_url[0], "g") return self._gremlin_client - def subgraph(self, gremlin_script, request_options=None): - """Create a subgraph, which input is the executor result of `gremlin_script`. + @property + def cypher_driver(self): + from neo4j import GraphDatabase + + if self._cypher_driver is None: + self._cypher_driver = GraphDatabase.driver( + self._cypher_url[0], auth=("", "") + ) + return self._cypher_driver + + def subgraph(self, gremlin_script, lang="gremlin", request_options=None): + """We currently only support subgraph using gremlin script. + + Create a subgraph, which input is the executor result of `gremlin_script`. Any gremlin script that output a set of edges can be used to construct a subgraph. @@ -124,6 +176,7 @@ def subgraph(self, gremlin_script, request_options=None): :class:`graphscope.framework.graph.GraphDAGNode`: A new graph constructed by the gremlin output, that also stored in vineyard. """ + assert lang == "gremlin", "Only support gremlin script" # avoid circular import from graphscope.framework.graph import GraphDAGNode @@ -136,7 +189,9 @@ def subgraph(self, gremlin_script, request_options=None): return self._session._wrapper(GraphDAGNode(self._session, op)) def traversal_source(self): - """Create a GraphTraversalSource and return. + """We currently only support traversal_source using gremlin. + + Create a GraphTraversalSource and return. Once `g` has been created using a connection, we can start to write Gremlin traversals to query the remote graph. @@ -158,7 +213,7 @@ def traversal_source(self): `GraphTraversalSource` """ if self._conn is None: - self._conn = DriverRemoteConnection(self._graph_url[0], "g") + self._conn = DriverRemoteConnection(self._gremlin_url[0], "g") return traversal().withRemote(self._conn) def close(self): @@ -175,5 +230,10 @@ def close(self): self._gremlin_client.close() except: # noqa: E722 pass + if self._cypher_driver is not None: + try: + self._cypher_driver.close() + except: # noqa: E722 + pass self._session._close_interactive_instance(self) self.closed = True diff --git a/python/graphscope/proto/message.proto b/python/graphscope/proto/message.proto index efc63e4f1065..3509d7aead60 100644 --- a/python/graphscope/proto/message.proto +++ b/python/graphscope/proto/message.proto @@ -189,7 +189,8 @@ message CreateInteractiveInstanceRequest { message CreateInteractiveInstanceResponse { string gremlin_endpoint = 1; - int64 object_id = 2; + string cypher_endpoint = 2; + int64 object_id = 3; }; message CreateLearningInstanceRequest { diff --git a/python/graphscope/tests/unittest/test_lazy.py b/python/graphscope/tests/unittest/test_lazy.py index f896a6c7db54..c14f80384c20 100644 --- a/python/graphscope/tests/unittest/test_lazy.py +++ b/python/graphscope/tests/unittest/test_lazy.py @@ -228,3 +228,13 @@ def test_across_engine(sess): res = interactive.execute("g.V().count()").all().result() # res = sess.run(res) assert res[0] == 62586 + + +def test_cypher_endpoint(sess): + from neo4j import RoutingControl + + g_node = load_p2p_network(sess) + interactive = sess.interactive(g_node) + _ = interactive.execute( + "MATCH (n) RETURN n LIMIT 1", lang="cypher", routing_=RoutingControl.READ + ) diff --git a/python/requirements.txt b/python/requirements.txt index a0984360094c..f75ab21d7f59 100644 --- a/python/requirements.txt +++ b/python/requirements.txt @@ -5,6 +5,7 @@ grpcio-tools>=1.49 kubernetes>=24.2.0 msgpack>=1.0.5 mypy-protobuf>=3.4.0 +neo4j==5.10.0 nest_asyncio networkx==2.8.0;python_version>="3.8" networkx==2.6.0;python_version<"3.8"