Skip to content

Commit

Permalink
[Enhancement] Add a runtime config to enable log stream load request/…
Browse files Browse the repository at this point in the history
…response

Signed-off-by: meegoo <[email protected]>
  • Loading branch information
meegoo committed Dec 5, 2023
1 parent 0618253 commit c8b2386
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,8 @@ CONF_mBool(enable_short_key_for_one_column_filter, "false");
CONF_mBool(enable_http_stream_load_limit, "false");
CONF_mInt32(finish_publish_version_internal, "100");

CONF_mBool(enable_stream_load_verbose_log, "false");

CONF_mInt32(get_txn_status_internal_sec, "30");

CONF_mBool(dump_metrics_with_bvar, "true");
Expand Down
17 changes: 13 additions & 4 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ StreamLoadAction::StreamLoadAction(ExecEnv* exec_env, ConcurrentLimiter* limiter

StreamLoadAction::~StreamLoadAction() = default;

static void _send_reply(HttpRequest* req, const std::string& str) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "stream load response: " << str;
}
HttpChannel::send_reply(req, str);
}

void StreamLoadAction::handle(HttpRequest* req) {
auto* ctx = (StreamLoadContext*)req->handler_ctx();
if (ctx == nullptr) {
Expand Down Expand Up @@ -163,7 +170,7 @@ void StreamLoadAction::handle(HttpRequest* req) {
}

auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
_send_reply(req, str);

// update statstics
streaming_load_requests_total.increment(1);
Expand Down Expand Up @@ -227,14 +234,16 @@ int StreamLoadAction::on_header(HttpRequest* req) {
Status::ResourceBusy(fmt::format("Stream Load exceed http cuncurrent limit {}, please try again later",
config::be_http_num_workers - 1));
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
_send_reply(req, str);
return -1;
} else {
LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db
<< ", tbl=" << ctx->table;
}

VLOG(1) << "streaming load request: " << req->debug_string();
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "streaming load request: " << req->debug_string();
}

auto st = _on_header(req, ctx);
if (!st.ok()) {
Expand All @@ -247,7 +256,7 @@ int StreamLoadAction::on_header(HttpRequest* req) {
ctx->body_sink->cancel(st);
}
auto str = ctx->to_json();
HttpChannel::send_reply(req, str);
_send_reply(req, str);
streaming_load_current_processing.increment(-1);
return -1;
}
Expand Down
8 changes: 8 additions & 0 deletions be/src/http/action/transaction_stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ TransactionManagerAction::TransactionManagerAction(ExecEnv* exec_env) : _exec_en
TransactionManagerAction::~TransactionManagerAction() = default;

static void _send_reply(HttpRequest* req, const std::string& str) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction stream load response: " << str;
}
HttpChannel::send_reply(req, str);
}

Expand Down Expand Up @@ -177,10 +180,15 @@ void TransactionStreamLoadAction::handle(HttpRequest* req) {

auto resp = _exec_env->transaction_mgr()->_build_reply(TXN_LOAD, ctx);
ctx->lock.unlock();

_send_reply(req, resp);
}

int TransactionStreamLoadAction::on_header(HttpRequest* req) {
if (config::enable_stream_load_verbose_log) {
LOG(INFO) << "transaction streaming load request: " << req->debug_string();
}

const auto& label = req->header(HTTP_LABEL_KEY);
if (label.empty()) {
_send_error_reply(req, Status::InvalidArgument(fmt::format("Invalid label {}", req->header(HTTP_LABEL_KEY))));
Expand Down

0 comments on commit c8b2386

Please sign in to comment.