Skip to content

Commit

Permalink
feat(interactive): Introduce explicit barrier for actors when switchi…
Browse files Browse the repository at this point in the history
…ng query service to a different graph (#3395)

When a `StartService(graph)` is received by admin service, we will 
1.  The actor scope for current query actors will be cancelled.
2. During the scope cancellation process of the query actors or after
scope cancellation is completed, all requests sent to the query_service
will fail and be rejected.
The response of the http request will be like
```json
{
  "code": 500,
  "message" : "Unable to send message, the target actor has been canceled!"
}
```
3. After the previous graph is closed and new graph is opened, the new
query actors will be available in a new scope.
4. The query service is now ready to serve requests on the new graph.

Fix #3394
  • Loading branch information
zhanglei1949 authored Dec 5, 2023
1 parent f5ebd08 commit 3c7ace2
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 170 deletions.
15 changes: 14 additions & 1 deletion docs/flex/interactive/development/admin_service.md
Original file line number Diff line number Diff line change
Expand Up @@ -942,7 +942,20 @@ curl -X DELETE -H "Content-Type: application/json" "http://[host]/v1/graph/{grap

#### Description

Start the query service on a graph.
Start the query service on a graph. The `graph_name` param can be empty, indicating restarting on current running graph.

1. After the AdminService receives this request, the current actor scope for query actors will be cancelled.
2. During the scope cancellation process of the query actors or after scope cancellation is completed, all requests sent to the query_service will fail and be rejected.
The response of the http request will be like
```json
{
"code": 500,
"message" : "Unable to send message, the target actor has been canceled!"
}
```
3. After the previous graph is closed and new graph is opened, the new query actors will be available in a new scope.
4. The query service is now ready to serve requests on the new graph.


#### HTTP Request
- **Method**: POST
Expand Down
4 changes: 2 additions & 2 deletions flex/bin/bulk_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ int main(int argc, char** argv) {
}
std::filesystem::path serial_path = data_dir_path / "schema";
if (std::filesystem::exists(serial_path)) {
LOG(ERROR) << "data directory is not empty";
return -1;
LOG(WARNING) << "data directory is not empty";
return 0;
}

auto loader = gs::LoaderFactory::CreateFragmentLoader(
Expand Down
67 changes: 39 additions & 28 deletions flex/engines/http_server/actor/admin_actor.act.cc
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ seastar::future<query_result> admin_actor::update_procedure(
}
}

// Manage the service of one graph.
// Start service on a graph first means stop all current running actors, then
// switch graph and and create new actors with a unused scope_id.
seastar::future<query_result> admin_actor::start_service(
query_param&& query_param) {
// parse query_param.content as json and get graph_name
Expand All @@ -296,49 +297,59 @@ seastar::future<query_result> admin_actor::start_service(
<< server::WorkDirManipulator::GetRunningGraph();
}
LOG(WARNING) << "Starting service with graph: " << graph_name;
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
return seastar::make_exception_future<query_result>(
std::runtime_error(e.what()));
}

auto schema_result = server::WorkDirManipulator::GetGraphSchema(graph_name);
if (!schema_result.ok()) {
LOG(ERROR) << "Fail to get graph schema: "
<< schema_result.status().error_message() << ", " << graph_name;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get graph schema: " + schema_result.status().error_message() +
", " + graph_name));
}
auto& schema_value = schema_result.value();
auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
if (!data_dir.ok()) {
LOG(ERROR) << "Fail to get data directory: "
<< data_dir.status().error_message();
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get data directory: " + data_dir.status().error_message()));
}
auto data_dir_value = data_dir.value();

// First Stop query_handler's actors.

auto& hqps_service = HQPSService::get();
return hqps_service.stop_query_actors().then([this, graph_name, schema_value,
data_dir_value, &hqps_service] {
LOG(INFO) << "Successfully stopped query handler";

auto schema_result = server::WorkDirManipulator::GetGraphSchema(graph_name);
if (!schema_result.ok()) {
LOG(ERROR) << "Fail to get graph schema: "
<< schema_result.status().error_message() << ", "
<< graph_name;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get graph schema: " +
schema_result.status().error_message() + ", " + graph_name));
}
auto data_dir = server::WorkDirManipulator::GetDataDirectory(graph_name);
if (!data_dir.ok()) {
LOG(ERROR) << "Fail to get data directory: "
<< data_dir.status().error_message();
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to get data directory: " + data_dir.status().error_message()));
}
{
std::lock_guard<std::mutex> lock(mtx_);
auto& db = gs::GraphDB::get();
LOG(INFO) << "Update service running on graph:" << graph_name;
auto& schema_value = schema_result.value();

// use the previous thread num
auto thread_num = db.SessionNum();
db.Close();
if (!db.Open(schema_value, data_dir.value(), thread_num).ok()) {
if (!db.Open(schema_value, data_dir_value, thread_num).ok()) {
LOG(ERROR) << "Fail to load graph from data directory: "
<< data_dir.value();
<< data_dir_value;
return seastar::make_exception_future<query_result>(std::runtime_error(
"Fail to load graph from data directory: " + data_dir.value()));
"Fail to load graph from data directory: " + data_dir_value));
}
server::WorkDirManipulator::SetRunningGraph(graph_name);
}

hqps_service.start_query_actors(); // start on a new scope.
LOG(INFO) << "Successfully restart query actors";
LOG(INFO) << "Successfully started service with graph: " << graph_name;

return seastar::make_ready_future<query_result>(
"Successfully start service");
} catch (std::exception& e) {
LOG(ERROR) << "Fail to Start service: ";
return seastar::make_exception_future<query_result>(
std::runtime_error(e.what()));
}
});
}

// get service status
Expand Down
Loading

0 comments on commit 3c7ace2

Please sign in to comment.