Skip to content

Commit

Permalink
add annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
jackwener committed Jan 14, 2022
1 parent 4a99a30 commit 73d81e9
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 44 deletions.
9 changes: 2 additions & 7 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@
#include <folly/String.h>
#include <gflags/gflags.h>

#include <algorithm>
#include <cstdio>
#include <fstream>
#include <regex>

#include "common/fs/FileUtils.h"

DEFINE_bool(containerized, false, "Whether run this process inside the docker container");
Expand Down Expand Up @@ -42,7 +37,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
uint64_t cacheSize = 0;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
cacheSize += std::stoul(sm[2].str(), NULL);
cacheSize += std::stoul(sm[2].str(), nullptr);
}

std::string limitPath =
Expand All @@ -64,7 +59,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
std::vector<uint64_t> memorySize;
for (; iter.valid(); ++iter) {
auto& sm = iter.matched();
memorySize.emplace_back(std::stoul(sm[2].str(), NULL) << 10);
memorySize.emplace_back(std::stoul(sm[2].str(), nullptr) << 10);
}
std::sort(memorySize.begin(), memorySize.end());
if (memorySize.size() >= 2u) {
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/Authenticator.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace nebula {
namespace graph {

// interface for authentication
class Authenticator {
public:
virtual ~Authenticator() = default;
Expand Down
6 changes: 1 addition & 5 deletions src/graph/service/CloudAuthenticator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,7 @@ Status CloudAuthenticator::auth(const std::string& user, const std::string& pass
std::string header = R"(-H "Content-Type: application/json" -H "Authorization:Nebula )";
header = header + base64Str + "\"";
auto result = http::HttpClient::post(FLAGS_cloud_http_url, header);

if (!result.ok()) {
LOG(ERROR) << result.status();
return result.status();
}
NG_LOG_AND_RETURN_IF_ERROR(result);

try {
auto json = folly::parseJson(result.value());
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/GraphServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ bool GraphServer::start() {
return false;
}

// init worker id for snowflake generating unique id
nebula::Snowflake::initWorkerId(interface->metaClient_.get());

graphThread_ = std::make_unique<std::thread>([&] {
Expand Down
27 changes: 14 additions & 13 deletions src/graph/service/GraphService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "graph/service/GraphService.h"

#include <boost/filesystem.hpp>
#include <memory>

#include "clients/storage/StorageClient.h"
#include "common/base/Base.h"
Expand Down Expand Up @@ -72,7 +73,7 @@ folly::Future<AuthResponse> GraphService::future_authenticate(const std::string&
auto authResult = auth(username, password);
if (!authResult.ok()) {
ctx->resp().errorCode = ErrorCode::E_BAD_USERNAME_PASSWORD;
ctx->resp().errorMsg.reset(new std::string(authResult.toString()));
ctx->resp().errorMsg = std::make_unique<std::string>(authResult.toString());
ctx->finish();
stats::StatsManager::addValue(kNumAuthFailedSessions);
stats::StatsManager::addValue(kNumAuthFailedSessionsBadUserNamePassword);
Expand All @@ -81,7 +82,7 @@ folly::Future<AuthResponse> GraphService::future_authenticate(const std::string&

if (!sessionManager_->isOutOfConnections()) {
ctx->resp().errorCode = ErrorCode::E_TOO_MANY_CONNECTIONS;
ctx->resp().errorMsg.reset(new std::string("Too many connections in the cluster"));
ctx->resp().errorMsg = std::make_unique<std::string>("Too many connections in the cluster");
ctx->finish();
stats::StatsManager::addValue(kNumAuthFailedSessions);
stats::StatsManager::addValue(kNumAuthFailedSessionsOutOfMaxAllowed);
Expand All @@ -95,24 +96,24 @@ folly::Future<AuthResponse> GraphService::future_authenticate(const std::string&
LOG(ERROR) << "Create session for userName: " << user << ", ip: " << cIp
<< " failed: " << ret.status();
ctx->resp().errorCode = ErrorCode::E_SESSION_INVALID;
ctx->resp().errorMsg.reset(new std::string(ret.status().toString()));
ctx->resp().errorMsg = std::make_unique<std::string>(ret.status().toString());
return ctx->finish();
}
auto sessionPtr = std::move(ret).value();
if (sessionPtr == nullptr) {
LOG(ERROR) << "Get session for sessionId is nullptr";
ctx->resp().errorCode = ErrorCode::E_SESSION_INVALID;
ctx->resp().errorMsg.reset(new std::string("Get session for sessionId is nullptr"));
ctx->resp().errorMsg = std::make_unique<std::string>("Get session for sessionId is nullptr");
return ctx->finish();
}
stats::StatsManager::addValue(kNumOpenedSessions);
stats::StatsManager::addValue(kNumActiveSessions);
ctx->setSession(sessionPtr);
ctx->resp().sessionId.reset(new int64_t(ctx->session()->id()));
ctx->resp().timeZoneOffsetSeconds.reset(
new int32_t(time::Timezone::getGlobalTimezone().utcOffsetSecs()));
ctx->resp().timeZoneName.reset(
new std::string(time::Timezone::getGlobalTimezone().stdZoneName()));
ctx->resp().sessionId = std::make_unique<int64_t>(ctx->session()->id());
ctx->resp().timeZoneOffsetSeconds =
std::make_unique<int32_t>(time::Timezone::getGlobalTimezone().utcOffsetSecs());
ctx->resp().timeZoneName =
std::make_unique<std::string>(time::Timezone::getGlobalTimezone().stdZoneName());
return ctx->finish();
};

Expand Down Expand Up @@ -147,16 +148,16 @@ folly::Future<ExecutionResponse> GraphService::future_executeWithParameter(
if (!ret.ok()) {
LOG(ERROR) << "Get session for sessionId: " << sessionId << " failed: " << ret.status();
ctx->resp().errorCode = ErrorCode::E_SESSION_INVALID;
ctx->resp().errorMsg.reset(new std::string(folly::stringPrintf(
"Get sessionId[%ld] failed: %s", sessionId, ret.status().toString().c_str())));
ctx->resp().errorMsg = std::make_unique<std::string>(folly::stringPrintf(
"Get sessionId[%ld] failed: %s", sessionId, ret.status().toString().c_str()));
return ctx->finish();
}
auto sessionPtr = std::move(ret).value();
if (sessionPtr == nullptr) {
LOG(ERROR) << "Get session for sessionId: " << sessionId << " is nullptr";
ctx->resp().errorCode = ErrorCode::E_SESSION_INVALID;
ctx->resp().errorMsg.reset(
new std::string(folly::stringPrintf("SessionId[%ld] does not exist", sessionId)));
ctx->resp().errorMsg = std::make_unique<std::string>(
folly::stringPrintf("SessionId[%ld] does not exist", sessionId));
return ctx->finish();
}
stats::StatsManager::addValue(kNumQueries);
Expand Down
2 changes: 1 addition & 1 deletion src/graph/service/GraphService.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace graph {
class GraphService final : public cpp2::GraphServiceSvIf {
public:
GraphService() = default;
~GraphService() = default;
~GraphService() override = default;

Status NG_MUST_USE_RESULT init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor,
const HostAddr& hostAddr);
Expand Down
24 changes: 10 additions & 14 deletions src/graph/service/PermissionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
namespace nebula {
namespace graph {

// static
Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
/* static */ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spaceId) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -35,8 +34,8 @@ Status PermissionManager::canReadSpace(ClientSession *session, GraphSpaceID spac
return Status::PermissionError("No permission to read space.");
}

// static
Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canReadSchemaOrData(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -60,8 +59,7 @@ Status PermissionManager::canReadSchemaOrData(ClientSession *session, ValidateCo
return Status::PermissionError("No permission to read schema/data.");
}

// static
Status PermissionManager::canWriteSpace(ClientSession *session) {
/* static */ Status PermissionManager::canWriteSpace(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -71,8 +69,8 @@ Status PermissionManager::canWriteSpace(ClientSession *session) {
return Status::PermissionError("No permission to write space.");
}

// static
Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteSchema(ClientSession *session,
ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -97,8 +95,7 @@ Status PermissionManager::canWriteSchema(ClientSession *session, ValidateContext
return Status::PermissionError("No permission to write schema.");
}

// static
Status PermissionManager::canWriteUser(ClientSession *session) {
/* static */ Status PermissionManager::canWriteUser(ClientSession *session) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand All @@ -113,8 +110,8 @@ Status PermissionManager::canWriteUser(ClientSession *session) {
}
}

// static
Status PermissionManager::canReadUser(ClientSession *session, const std::string &targetUser) {
/* static */ Status PermissionManager::canReadUser(ClientSession *session,
const std::string &targetUser) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down Expand Up @@ -177,8 +174,7 @@ Status PermissionManager::canWriteRole(ClientSession *session,
targetUser.c_str());
}

// static
Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
/* static */ Status PermissionManager::canWriteData(ClientSession *session, ValidateContext *vctx) {
if (!FLAGS_enable_authorize) {
return Status::OK();
}
Expand Down
1 change: 1 addition & 0 deletions src/graph/service/PermissionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
namespace nebula {
namespace graph {

// This module is responsible for checking the permission of the user
class PermissionManager final {
public:
PermissionManager() = delete;
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/QueryEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DEFINE_int32(check_memory_interval_in_secs, 1, "Memory check interval in seconds
namespace nebula {
namespace graph {

// register planners, init optimizer and set memory monitor
Status QueryEngine::init(std::shared_ptr<folly::IOThreadPoolExecutor> ioExecutor,
meta::MetaClient* metaClient) {
metaClient_ = metaClient;
Expand Down Expand Up @@ -54,6 +55,7 @@ void QueryEngine::execute(RequestContextPtr rctx) {
instance->execute();
}

// setup memory monitor thread
Status QueryEngine::setupMemoryMonitorThread() {
memoryMonitorThread_ = std::make_unique<thread::GenericWorker>();
if (!memoryMonitorThread_ || !memoryMonitorThread_->start("graph-memory-monitor")) {
Expand Down
15 changes: 12 additions & 3 deletions src/graph/service/QueryInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ void QueryInstance::execute() {
return;
}

if (!explainOrContinue()) {
// sentence is explain query, finish
if (explainAndFinish()) {
onFinish();
return;
}

// the execution engine converts the physical execution plan generated by the Planner into a
// series of Executors through the Scheduler to drive the execution of the Executors.
scheduler_->schedule()
.thenValue([this](Status s) {
if (s.ok()) {
Expand All @@ -66,6 +69,7 @@ Status QueryInstance::validateAndOptimize() {
auto *rctx = qctx()->rctx();
auto &spaceName = rctx->session()->space().name;
VLOG(1) << "Parsing query: " << rctx->query();
// result of parsing, get the parsing tree
auto result = GQLParser(qctx()).parse(rctx->query());
NG_RETURN_IF_ERROR(result);
sentence_ = std::move(result).value();
Expand All @@ -84,7 +88,9 @@ Status QueryInstance::validateAndOptimize() {
}
}

// validate the query, if failed, return
NG_RETURN_IF_ERROR(Validator::validate(sentence_.get(), qctx()));
// optimize the query, and get the execution plan
NG_RETURN_IF_ERROR(findBestPlan());
stats::StatsManager::addValue(kOptimizerLatencyUs, *(qctx_->plan()->optimizeTimeInUs()));
if (FLAGS_enable_space_level_metrics && spaceName != "") {
Expand All @@ -95,9 +101,10 @@ Status QueryInstance::validateAndOptimize() {
return Status::OK();
}

bool QueryInstance::explainOrContinue() {
// sentence is explain query, return the description of plan , and then finish
bool QueryInstance::explainAndFinish() {
if (sentence_->kind() != Sentence::Kind::kExplain) {
return true;
return false;
}
auto &resp = qctx_->rctx()->resp();
resp.planDesc = std::make_unique<PlanDescription>();
Expand Down Expand Up @@ -209,6 +216,7 @@ void QueryInstance::addSlowQueryStats(uint64_t latency, const std::string &space
}
}

// get result from query context and fill the response
void QueryInstance::fillRespData(ExecutionResponse *resp) {
auto ectx = DCHECK_NOTNULL(qctx_->ectx());
auto plan = DCHECK_NOTNULL(qctx_->plan());
Expand All @@ -229,6 +237,7 @@ void QueryInstance::fillRespData(ExecutionResponse *resp) {
}
}

// the entry point of the optimizer
Status QueryInstance::findBestPlan() {
auto plan = qctx_->plan();
SCOPED_TIMER(plan->optimizeTimeInUs());
Expand Down
3 changes: 2 additions & 1 deletion src/graph/service/QueryInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class QueryInstance final : public cpp::NonCopyable, public cpp::NonMovable {
explicit QueryInstance(std::unique_ptr<QueryContext> qctx, opt::Optimizer* optimizer);
~QueryInstance() = default;

// entrance of the Validate, Optimize, Schedule, Execute process
void execute();

QueryContext* qctx() const {
Expand All @@ -50,7 +51,7 @@ class QueryInstance final : public cpp::NonCopyable, public cpp::NonMovable {

Status validateAndOptimize();
// return true if continue to execute
bool explainOrContinue();
bool explainAndFinish();
void addSlowQueryStats(uint64_t latency, const std::string& spaceName) const;
void fillRespData(ExecutionResponse* resp);
Status findBestPlan();
Expand Down

0 comments on commit 73d81e9

Please sign in to comment.