Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cypher query in python #2952

Merged
merged 16 commits into from
Jul 6, 2023
Merged
7 changes: 5 additions & 2 deletions coordinator/gscoordinator/cluster_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,13 @@ def get_interactive_frontend_deployment(self, replicas=1):
self._namespace, name, deployment_spec, self._frontend_labels
)

def get_interactive_frontend_service(self, port):
def get_interactive_frontend_service(self, gremlin_port, cypher_port):
name = self.frontend_deployment_name
service_type = self._service_type
ports = [kube_client.V1ServicePort(name="gremlin", port=port)]
ports = [
kube_client.V1ServicePort(name="gremlin", port=gremlin_port),
kube_client.V1ServicePort(name="cypher", port=cypher_port),
]
service_spec = ResourceBuilder.get_service_spec(
service_type, ports, self._frontend_labels, None
)
Expand Down
51 changes: 40 additions & 11 deletions coordinator/gscoordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
from gscoordinator.object_manager import ObjectManager
from gscoordinator.op_executor import OperationExecutor
from gscoordinator.utils import GS_GRPC_MAX_MESSAGE_LENGTH
from gscoordinator.utils import check_gremlin_server_ready
from gscoordinator.utils import check_server_ready
from gscoordinator.utils import create_single_op_dag
from gscoordinator.utils import str2bool
from gscoordinator.version import __version__
Expand Down Expand Up @@ -441,10 +441,16 @@ def _match_frontend_endpoint(pattern, lines):
return ""

# frontend endpoint pattern
FRONTEND_PATTERN = re.compile("(?<=FRONTEND_ENDPOINT:).*$")
FRONTEND_GREMLIN_PATTERN = re.compile("(?<=FRONTEND_GREMLIN_ENDPOINT:).*$")
FRONTEND_CYPHER_PATTERN = re.compile("(?<=FRONTEND_CYPHER_ENDPOINT:).*$")
# frontend external endpoint, for clients that are outside of cluster to connect
# only available in kubernetes mode, exposed by NodePort or LoadBalancer
FRONTEND_EXTERNAL_PATTERN = re.compile("(?<=FRONTEND_EXTERNAL_ENDPOINT:).*$")
FRONTEND_EXTERNAL_GREMLIN_PATTERN = re.compile(
"(?<=FRONTEND_EXTERNAL_GREMLIN_ENDPOINT:).*$"
)
FRONTEND_EXTERNAL_CYPHER_PATTERN = re.compile(
"(?<=FRONTEND_EXTERNAL_CYPHER_ENDPOINT:).*$"
)

# create instance
object_id = request.object_id
Expand All @@ -464,13 +470,22 @@ def _match_frontend_endpoint(pattern, lines):
return_code = proc.poll()
if return_code != 0:
raise RuntimeError(f"Error code: {return_code}, message {outs}")
# match frontend endpoint and check for ready
endpoint = _match_frontend_endpoint(FRONTEND_PATTERN, outs)
# match frontend endpoints and check for ready
gremlin_endpoint = _match_frontend_endpoint(FRONTEND_GREMLIN_PATTERN, outs)
cypher_endpoint = _match_frontend_endpoint(FRONTEND_CYPHER_PATTERN, outs)
logger.debug("Got endpoints: %s %s", gremlin_endpoint, cypher_endpoint)
# coordinator use internal endpoint
gie_manager.set_endpoint(endpoint)
if check_gremlin_server_ready(endpoint): # throws TimeoutError
gie_manager.set_endpoint(gremlin_endpoint)
if check_server_ready(
gremlin_endpoint, server="gremlin"
) and check_server_ready(
cypher_endpoint, server="cypher"
): # throws TimeoutError
logger.info(
"Built interactive frontend %s for graph %ld", endpoint, object_id
"Built interactive frontend gremlin: %s & cypher: %s for graph %ld",
gremlin_endpoint,
cypher_endpoint,
object_id,
)
except Exception as e:
context.set_code(grpc.StatusCode.ABORTED)
Expand All @@ -480,11 +495,25 @@ def _match_frontend_endpoint(pattern, lines):
self._launcher.close_interactive_instance(object_id)
self._object_manager.pop(object_id)
return message_pb2.CreateInteractiveInstanceResponse()
external_endpoint = _match_frontend_endpoint(FRONTEND_EXTERNAL_PATTERN, outs)
external_gremlin_endpoint = _match_frontend_endpoint(
FRONTEND_EXTERNAL_GREMLIN_PATTERN, outs
)
external_cypher_endpoint = _match_frontend_endpoint(
FRONTEND_EXTERNAL_CYPHER_PATTERN, outs
)
logger.debug(
"Got external endpoints: %s %s",
external_gremlin_endpoint,
external_cypher_endpoint,
)

# client use external endpoint (k8s mode), or internal endpoint (standalone mode)
endpoint = external_endpoint or endpoint
gremlin_endpoint = external_gremlin_endpoint or gremlin_endpoint
cypher_endpoint = external_cypher_endpoint or cypher_endpoint
return message_pb2.CreateInteractiveInstanceResponse(
gremlin_endpoint=endpoint, object_id=object_id
gremlin_endpoint=gremlin_endpoint,
cypher_endpoint=cypher_endpoint,
object_id=object_id,
)

def CreateLearningInstance(self, request, context):
Expand Down
7 changes: 4 additions & 3 deletions coordinator/gscoordinator/kubernetes_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,12 +636,13 @@ def _distribute_interactive_process(
container,
str(self._interactive_port), # executor port
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2), # frontend port
str(self._interactive_port + 2), # frontend gremlin port
str(self._interactive_port + 3), # frontend cypher port
self._coordinator_name,
engine_selector,
params,
]
self._interactive_port += 3
self._interactive_port += 4
logger.info("Create GIE instance with command: %s", " ".join(cmd))
process = subprocess.Popen(
cmd,
Expand Down Expand Up @@ -947,7 +948,7 @@ def _create_frontend_deployment(self, name=None, owner_references=None):

def _create_frontend_service(self):
logger.info("Creating frontend service...")
service = self._engine_cluster.get_interactive_frontend_service(8233)
service = self._engine_cluster.get_interactive_frontend_service(8233, 7687)
service.metadata.owner_references = self._owner_references
response = self._core_api.create_namespaced_service(self._namespace, service)
self._resource_object.append(response)
Expand Down
5 changes: 3 additions & 2 deletions coordinator/gscoordinator/local_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,12 +232,13 @@ def create_interactive_instance(
str(num_workers), # server size
str(self._interactive_port), # executor port
str(self._interactive_port + 1), # executor rpc port
str(self._interactive_port + 2 * num_workers), # frontend port
str(self._interactive_port + 2 * num_workers), # frontend gremlin port
str(self._interactive_port + 2 * num_workers + 1), # frontend cypher port
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try launch multiple interactive instance, and see if the port will conflict?
I mean, the line 240 may need to change.

self.vineyard_socket,
params,
]
logger.info("Create GIE instance with command: %s", " ".join(cmd))
self._interactive_port += 3
self._interactive_port += 2 * num_workers + 2
process = subprocess.Popen(
cmd,
start_new_session=True,
Expand Down
41 changes: 37 additions & 4 deletions coordinator/gscoordinator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1996,33 +1996,64 @@ def check_argument(condition, message=None):
raise ValueError(f"Check failed: {message}")


def check_gremlin_server_ready(endpoint):
def _check_task(endpoint):
def check_server_ready(endpoint, server="gremlin"):
def _check_gremlin_task(endpoint):
from gremlin_python.driver.client import Client

if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
# now, used in macOS with docker-desktop kubernetes cluster,
# which external ip is 'localhost' when service type is 'LoadBalancer'
logger.info("In kubernetes env, gremlin server is ready.")
return True

try:
client = Client(f"ws://{endpoint}/gremlin", "g")
# May throw
client.submit("g.V().limit(1)").all().result()
logger.info("Gremlin server is ready.")
finally:
try:
client.close()
except: # noqa: E722
pass
return True

def _check_cypher_task(endpoint):
from neo4j import GraphDatabase

if "MY_POD_NAME" in os.environ:
# inner kubernetes env
if endpoint == "localhost" or endpoint == "127.0.0.1":
logger.info("In kubernetes env, cypher server is ready.")
return True

try:
logger.debug("Try to connect to cypher server.")
driver = GraphDatabase.driver(f"neo4j://{endpoint}", auth=("", ""))
# May throw
driver.verify_connectivity()
logger.info("Checked connectivity to cypher server.")
finally:
try:
driver.close()
except: # noqa: E722
pass
return True

executor = ThreadPoolExecutor(max_workers=20)

begin_time = time.time()
while True:
t = executor.submit(_check_task, endpoint)
if server == "gremlin":
t = executor.submit(_check_gremlin_task, endpoint)
elif server == "cypher":
t = executor.submit(_check_cypher_task, endpoint)
else:
raise ValueError(
f"Unsupported server type: {server} other than 'gremlin' or 'cypher'"
)
try:
_ = t.result(timeout=30)
except Exception as e:
Expand All @@ -2034,4 +2065,6 @@ def _check_task(endpoint):
time.sleep(3)
if time.time() - begin_time > INTERACTIVE_INSTANCE_TIMEOUT_SECONDS:
executor.shutdown(wait=False)
raise TimeoutError(f"Gremlin check query failed: {error_message}")
raise TimeoutError(
f"{server.capitalize()} check query failed: {error_message}"
)
Loading