Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(interactive): Refactor StopService API for Interactive SDKs #4271

Merged
merged 12 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion coordinator/gscoordinator/flex/core/interactive/hqps.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from gs_interactive.models.create_procedure_request import CreateProcedureRequest
from gs_interactive.models.schema_mapping import SchemaMapping
from gs_interactive.models.start_service_request import StartServiceRequest
from gs_interactive.models.stop_service_request import StopServiceRequest
from gs_interactive.models.update_procedure_request import UpdateProcedureRequest

from gscoordinator.flex.core.config import CLUSTER_TYPE
Expand Down Expand Up @@ -263,7 +264,7 @@ def stop_service(self) -> str:
gs_interactive.Configuration(self._hqps_endpoint)
) as api_client:
api_instance = gs_interactive.AdminServiceServiceManagementApi(api_client)
return api_instance.stop_service()
return api_instance.stop_service(StopServiceRequest())

def restart_service(self) -> str:
with gs_interactive.ApiClient(
Expand Down
2 changes: 1 addition & 1 deletion docs/flex/interactive/development/python/python_sdk.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ Currently Interactive forbid deleting a graph which is currently serving in the

```python
# stop the service first
resp = sess.stop_service()
resp = sess.stop_service(graph_id)
assert resp.is_ok()
print("successfully stopped the service")

Expand Down
34 changes: 33 additions & 1 deletion flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1033,15 +1033,47 @@ seastar::future<admin_query_result> admin_actor::start_service(
// The port is still connectable.
seastar::future<admin_query_result> admin_actor::stop_service(
query_param&& query_param) {
// Try to get the json content from query_param
std::string graph_id = "";
try {
auto& content = query_param.content;
if (!content.empty()) {
rapidjson::Document json;
if (json.Parse(content.c_str()).HasParseError()) {
throw std::runtime_error("Fail to parse json: " +
std::to_string(json.GetParseError()));
}
if (json.HasMember("graph_id")) {
graph_id = json["graph_id"].GetString();
}
LOG(INFO) << "Stop service with graph: " << graph_id;
}
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fail to Stop service?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::BAD_REQUEST,
"Fail to parse json: " + std::string(e.what()))));
}

auto& graph_db_service = GraphDBService::get();
return graph_db_service.stop_query_actors().then([this] {
return graph_db_service.stop_query_actors().then([this, graph_id] {
LOG(INFO) << "Successfully stopped query handler";
// Add also remove current running graph
{
std::lock_guard<std::mutex> lock(mtx_);
// unlock the graph
auto cur_running_graph_res = metadata_store_->GetRunningGraph();
if (cur_running_graph_res.ok()) {
if (!graph_id.empty() && graph_id != cur_running_graph_res.value()) {
LOG(ERROR) << "The specified graph is not running: "
<< cur_running_graph_res.value();
return seastar::make_ready_future<admin_query_result>(
gs::Result<seastar::sstring>(
gs::Status(gs::StatusCode::NOT_FOUND,
"The graph is not running: " +
cur_running_graph_res.value())));
}
auto unlock_res =
metadata_store_->UnlockGraphIndices(cur_running_graph_res.value());
if (!unlock_res.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.alibaba.graphscope.interactive.client.common.Result;
import com.alibaba.graphscope.interactive.models.ServiceStatus;
import com.alibaba.graphscope.interactive.models.StartServiceRequest;
import com.alibaba.graphscope.interactive.models.StopServiceRequest;

/**
* Manage the query interface.
Expand All @@ -29,5 +30,5 @@ public interface QueryServiceInterface {

Result<String> startService(StartServiceRequest service);

Result<String> stopService();
Result<String> stopService(StopServiceRequest graphId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -801,9 +801,9 @@ public Result<String> startService(StartServiceRequest service) {
}

@Override
public Result<String> stopService() {
public Result<String> stopService(StopServiceRequest request) {
try {
ApiResponse<String> response = serviceApi.stopServiceWithHttpInfo();
ApiResponse<String> response = serviceApi.stopServiceWithHttpInfo(request);
return Result.fromResponse(response);
} catch (ApiException e) {
e.printStackTrace();
Expand Down
9 changes: 6 additions & 3 deletions flex/interactive/sdk/python/gs_interactive/client/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def start_service(
raise NotImplementedError

@abstractmethod
def stop_service(self) -> Result[str]:
def stop_service(self, graph_id: str) -> Result[str]:
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -656,9 +656,12 @@ def start_service(
except Exception as e:
return Result.from_exception(e)

def stop_service(self) -> Result[str]:
def stop_service(self, graph_id : str = None) -> Result[str]:
try:
response = self._service_api.stop_service_with_http_info()
req = StopServiceRequest()
if graph_id:
req.graph_id = graph_id
response = self._service_api.stop_service_with_http_info(req)
return Result.from_response(response)
except Exception as e:
return Result.from_exception(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,10 @@ def test_builtin_procedure(interactive_session,neo4j_session, create_modern_grap
# Call the builtin procedure
start_service_on_graph(interactive_session, create_modern_graph)
call_procedure(neo4j_session, create_modern_graph, "count_vertices", '"person"')


def test_stop_service(interactive_session, create_modern_graph):
print("[Test stop service]")
start_service_on_graph(interactive_session, create_modern_graph)
# stop the service
stop_res = interactive_session.stop_service(graph_id = "A Invalid graph id")
assert not stop_res.is_ok()
13 changes: 13 additions & 0 deletions flex/openapi/openapi_interactive.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,12 @@ paths:
- AdminService/ServiceManagement
description: Stop current service
operationId: stop_service
requestBody:
description: Stop service on a specified graph
content:
application/json:
schema:
$ref: '#/components/schemas/StopServiceRequest'
responses:
'200':
description: successful operation
Expand Down Expand Up @@ -2015,6 +2021,13 @@ components:
properties:
graph_id:
type: string
StopServiceRequest:
x-body-name: stop_service_request
properties:
graph_id:
type: string
nullable: true
additionalProperties: false
ServiceStatus:
x-body-name: service_status
type: object
Expand Down
Loading