Skip to content

Commit

Permalink
fix(interactive): support non-blocking data loading interface in coor…
Browse files Browse the repository at this point in the history
…dinator and fix failure during flexbuild process (#3530)
  • Loading branch information
lidongze0629 authored Feb 2, 2024
1 parent 3e85935 commit d29516b
Show file tree
Hide file tree
Showing 41 changed files with 3,456 additions and 1,070 deletions.
1 change: 1 addition & 0 deletions flex/coordinator/.openapi-generator-ignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@

gs_flex_coordinator/controllers/*
setup.py
requirements.txt
2 changes: 1 addition & 1 deletion flex/coordinator/.openapi-generator/FILES
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ gs_flex_coordinator/models/groot_graph_gremlin_interface.py
gs_flex_coordinator/models/groot_property.py
gs_flex_coordinator/models/groot_schema.py
gs_flex_coordinator/models/groot_vertex_type.py
gs_flex_coordinator/models/job_status.py
gs_flex_coordinator/models/model_property.py
gs_flex_coordinator/models/model_schema.py
gs_flex_coordinator/models/node_status.py
Expand All @@ -54,6 +55,5 @@ gs_flex_coordinator/openapi/openapi.yaml
gs_flex_coordinator/test/__init__.py
gs_flex_coordinator/typing_utils.py
gs_flex_coordinator/util.py
requirements.txt
test-requirements.txt
tox.ini
68 changes: 68 additions & 0 deletions flex/coordinator/gs_flex_coordinator/controllers/job_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import connexion
from typing import Dict
from typing import Tuple
from typing import Union

from gs_flex_coordinator.core import client_wrapper
from gs_flex_coordinator.core import handle_api_exception
from gs_flex_coordinator.models.job_status import JobStatus # noqa: E501
from gs_flex_coordinator.models.schema_mapping import SchemaMapping # noqa: E501
from gs_flex_coordinator import util


@handle_api_exception()
def create_dataloading_job(graph_name, schema_mapping): # noqa: E501
"""create_dataloading_job
# noqa: E501
:param graph_name:
:type graph_name: str
:param schema_mapping:
:type schema_mapping: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.create_dataloading_job(graph_name, schema_mapping)


@handle_api_exception()
def delete_job_by_id(job_id): # noqa: E501
"""delete_job_by_id
# noqa: E501
:param job_id:
:type job_id: str
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
return client_wrapper.delete_job_by_id(job_id)


@handle_api_exception()
def get_job_by_id(job_id): # noqa: E501
"""get_job_by_id
# noqa: E501
:param job_id:
:type job_id: str
:rtype: Union[JobStatus, Tuple[JobStatus, int], Tuple[JobStatus, int, Dict[str, str]]
"""
return client_wrapper.get_job_by_id(job_id)


@handle_api_exception()
def list_jobs(): # noqa: E501
"""list_jobs
# noqa: E501
:rtype: Union[List[JobStatus], Tuple[List[JobStatus], int], Tuple[List[JobStatus], int, Dict[str, str]]
"""
return client_wrapper.list_jobs()
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,6 @@
from gs_flex_coordinator.models.schema_mapping import SchemaMapping # noqa: E501
from gs_flex_coordinator import util

@handle_api_exception()
def data_import(graph_name, schema_mapping): # noqa: E501
"""data_import
# noqa: E501
:param graph_name:
:type graph_name: str
:param schema_mapping:
:type schema_mapping: dict | bytes
:rtype: Union[str, Tuple[str, int], Tuple[str, int, Dict[str, str]]
"""
if connexion.request.is_json:
schema_mapping = SchemaMapping.from_dict(connexion.request.get_json()) # noqa: E501
return client_wrapper.data_import(graph_name, schema_mapping)


@handle_api_exception()
def get_groot_schema(graph_name): # noqa: E501
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def collect(self, message: AlertMessage):
message.trigger_time
)

logger.info("Alert message generated: %s", str(message.to_dict()))
# logger.info("Alert message generated: %s", str(message.to_dict()))

# add message to current collector
self._current_message_collector.add_message(message)
Expand Down
40 changes: 27 additions & 13 deletions flex/coordinator/gs_flex_coordinator/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,22 @@
# limitations under the License.
#

import datetime
import itertools
import logging
import socket
import threading
from typing import List, Union

import psutil
from gs_flex_coordinator.core.config import CLUSTER_TYPE, INSTANCE_NAME, SOLUTION
from gs_flex_coordinator.core.config import (CLUSTER_TYPE, INSTANCE_NAME,
SOLUTION)
from gs_flex_coordinator.core.interactive import init_hqps_client
from gs_flex_coordinator.models import (
DeploymentInfo,
Graph,
ModelSchema,
NodeStatus,
Procedure,
ServiceStatus,
StartServiceRequest,
)
from gs_flex_coordinator.core.utils import encode_datetime
from gs_flex_coordinator.models import (DeploymentInfo, Graph, JobStatus,
ModelSchema, NodeStatus, Procedure,
SchemaMapping, ServiceStatus,
StartServiceRequest)
from gs_flex_coordinator.version import __version__

logger = logging.getLogger("graphscope")
Expand Down Expand Up @@ -132,7 +130,23 @@ def restart_service(self) -> str:
def start_service(self, request: StartServiceRequest) -> str:
return self._client.start_service(request)

def data_import(self, graph_name, schema_mapping) -> str:
def list_jobs(self) -> List[JobStatus]:
# transfer
rlt = []
for job_status_dict in self._client.list_jobs():
rlt.append(JobStatus.from_dict(job_status_dict))
return rlt

def get_job_by_id(self, job_id: str) -> JobStatus:
job_status_dict = self._client.get_job_by_id(job_id)
return JobStatus.from_dict(job_status_dict)

def delete_job_by_id(self, job_id: str) -> str:
return self._client.delete_job_by_id(job_id)

def create_dataloading_job(
self, graph_name: str, schema_mapping: SchemaMapping
) -> str:
# there are some tricks here, since property is a keyword of openapi
# specification, so it will be converted into the _property field.
schema_mapping_dict = schema_mapping.to_dict()
Expand All @@ -142,8 +156,8 @@ def data_import(self, graph_name, schema_mapping) -> str:
for column_mapping in mapping["column_mappings"]:
if "_property" in column_mapping:
column_mapping["property"] = column_mapping.pop("_property")
print(schema_mapping_dict)
return self._client.data_import(graph_name, schema_mapping_dict)
job_id = self._client.create_dataloading_job(graph_name, schema_mapping_dict)
return job_id


client_wrapper = ClientWrapper()
75 changes: 55 additions & 20 deletions flex/coordinator/gs_flex_coordinator/core/interactive/hqps.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,20 @@
# limitations under the License.
#

import datetime
import logging
import os
from typing import List, Union

from gs_flex_coordinator.core.config import (
CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE,
)
from gs_flex_coordinator.core.utils import get_internal_ip
from gs_flex_coordinator.models import StartServiceRequest

import hqps_client
from hqps_client import (
Graph,
ModelSchema,
NodeStatus,
Procedure,
SchemaMapping,
Service,
)
from hqps_client import (Graph, JobResponse, JobStatus, ModelSchema, Procedure,
SchemaMapping, Service)

from gs_flex_coordinator.core.config import (CLUSTER_TYPE,
HQPS_ADMIN_SERVICE_PORT,
WORKSPACE)
from gs_flex_coordinator.core.utils import encode_datetime, get_internal_ip
from gs_flex_coordinator.models import StartServiceRequest

logger = logging.getLogger("graphscope")

Expand Down Expand Up @@ -170,15 +163,57 @@ def start_service(self, request: StartServiceRequest) -> str:
Service.from_dict({"graph_name": request.graph_name})
)

def data_import(self, graph_name: str, schema_mapping: dict) -> str:
print(graph_name, schema_mapping)
def list_jobs(self) -> List[dict]:
with hqps_client.ApiClient(
hqps_client.Configuration(self._hqps_endpoint)
) as api_client:
api_instance = hqps_client.JobApi(api_client)
rlt = []
for s in api_instance.list_jobs():
job_status = s.to_dict()
job_status["start_time"] = encode_datetime(
datetime.datetime.fromtimestamp(job_status["start_time"] / 1000)
)
if "end_time" in job_status:
job_status["end_time"] = encode_datetime(
datetime.datetime.fromtimestamp(job_status["end_time"] / 1000)
)
rlt.append(job_status)
return rlt

def get_job_by_id(self, job_id: str) -> dict:
with hqps_client.ApiClient(
hqps_client.Configuration(self._hqps_endpoint)
) as api_client:
api_instance = hqps_client.JobApi(api_client)
job_status = api_instance.get_job_by_id(job_id).to_dict()
job_status["start_time"] = encode_datetime(
datetime.datetime.fromtimestamp(job_status["start_time"] / 1000)
)
if "end_time" in job_status:
job_status["end_time"] = encode_datetime(
datetime.datetime.fromtimestamp(job_status["end_time"] / 1000)
)
return job_status

def delete_job_by_id(self, job_id: str) -> str:
with hqps_client.ApiClient(
hqps_client.Configuration(self._hqps_endpoint)
) as api_client:
api_instance = hqps_client.JobApi(api_client)
return api_instance.delete_job_by_id(job_id)

def create_dataloading_job(
self, graph_name: str, schema_mapping: dict
) -> JobResponse:
with hqps_client.ApiClient(
hqps_client.Configuration(self._hqps_endpoint)
) as api_client:
api_instance = hqps_client.DataloadingApi(api_client)
return api_instance.create_dataloading_job(
api_instance = hqps_client.JobApi(api_client)
response = api_instance.create_dataloading_job(
graph_name, SchemaMapping.from_dict(schema_mapping)
)
return response.job_id


def init_hqps_client():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
__version__ = "1.0.0"

# import apis into sdk package
from hqps_client.api.dataloading_api import DataloadingApi
from hqps_client.api.graph_api import GraphApi
from hqps_client.api.node_api import NodeApi
from hqps_client.api.job_api import JobApi
from hqps_client.api.procedure_api import ProcedureApi
from hqps_client.api.service_api import ServiceApi

Expand All @@ -47,9 +46,10 @@
from hqps_client.models.edge_type_vertex_type_pair_relations_inner_x_csr_params import EdgeTypeVertexTypePairRelationsInnerXCsrParams
from hqps_client.models.graph import Graph
from hqps_client.models.graph_stored_procedures import GraphStoredProcedures
from hqps_client.models.job_response import JobResponse
from hqps_client.models.job_status import JobStatus
from hqps_client.models.model_property import ModelProperty
from hqps_client.models.model_schema import ModelSchema
from hqps_client.models.node_status import NodeStatus
from hqps_client.models.procedure import Procedure
from hqps_client.models.procedure_params_inner import ProcedureParamsInner
from hqps_client.models.property_property_type import PropertyPropertyType
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# flake8: noqa

# import apis into api package
from hqps_client.api.dataloading_api import DataloadingApi
from hqps_client.api.graph_api import GraphApi
from hqps_client.api.node_api import NodeApi
from hqps_client.api.job_api import JobApi
from hqps_client.api.procedure_api import ProcedureApi
from hqps_client.api.service_api import ServiceApi

Loading

0 comments on commit d29516b

Please sign in to comment.