Skip to content

Commit

Permalink
[Enhancement] Support profile for only big query (#33825)
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
(cherry picked from commit 0e2d056)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java
#	gensrc/thrift/InternalService.thrift
  • Loading branch information
liuyehcf authored and mergify[bot] committed Oct 30, 2023
1 parent 0b1e8ec commit 31977f9
Show file tree
Hide file tree
Showing 13 changed files with 2,702 additions and 5 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_enable_profile();
}
if (query_options.__isset.big_query_profile_second_threshold) {
_query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold);
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() { return _enable_profile; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (_big_query_profile_threshold_ns <= 0) {
return false;
}
return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns;
}
void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) {
_big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s;
}
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
Expand Down Expand Up @@ -202,6 +213,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
bool _enable_profile = false;
int64_t _big_query_profile_threshold_ns = 0;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
198 changes: 198 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.common.profile;

import com.google.common.base.Stopwatch;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.qe.ConnectContext;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.function.Function;

public class Tracers {
public enum Mode {
NONE, LOGS, VARS, TIMER, TIMING,
}

public enum Module {
NONE, ALL, BASE, OPTIMIZER, SCHEDULER, ANALYZE, MV, EXTERNAL,
}

private static final Tracer EMPTY_TRACER = new Tracer() {
};

private static final ThreadLocal<Tracers> THREAD_LOCAL = ThreadLocal.withInitial(Tracers::new);

// [empty tracer, real tracer]
private final Tracer[] allTracer = new Tracer[] {EMPTY_TRACER, EMPTY_TRACER};

// mark enable module
private int moduleMask = 0;

// mark enable mode
private int modeMask = 0;

private boolean isCommandLog = false;

private Tracer tracer(Module module, Mode mode) {
// need return real tracer when mode && module enable
// enable mode is `modeMask |= 1 << mode.ordinal()`, check mode is `(modeMask >> mode.ordinal()) & 1`, so
// when enable mode will return allTracer[1], disable will return allTracer[0]
return allTracer[(modeMask >> mode.ordinal()) & (moduleMask >> module.ordinal() & 1)];
}

public static void register(ConnectContext context) {
Tracers tracers = THREAD_LOCAL.get();
tracers.isCommandLog = StringUtils.equalsIgnoreCase("command", context.getSessionVariable().getTraceLogMode());
LogTracer logTracer = tracers.isCommandLog ? new CommandLogTracer() : new FileLogTracer();
tracers.allTracer[0] = EMPTY_TRACER;
tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer);
}

public static void register() {
// default register FileLogTracer
Tracers tracers = THREAD_LOCAL.get();
LogTracer logTracer = new FileLogTracer();
tracers.allTracer[0] = EMPTY_TRACER;
tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer);
}

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
if (Module.NONE == module || null == module) {
tracers.moduleMask = 0;
}
if (Mode.NONE == mode || null == mode) {
tracers.modeMask = 0;
}
if (enableProfile) {
tracers.moduleMask |= 1 << Module.BASE.ordinal();
tracers.moduleMask |= 1 << Module.EXTERNAL.ordinal();
tracers.moduleMask |= 1 << Module.SCHEDULER.ordinal();

tracers.modeMask |= 1 << Mode.TIMER.ordinal();
tracers.modeMask |= 1 << Mode.VARS.ordinal();
}
if (checkMV) {
tracers.moduleMask |= 1 << Module.MV.ordinal();

tracers.modeMask |= 1 << Mode.VARS.ordinal();
}
if (Module.ALL == module) {
tracers.moduleMask = Integer.MAX_VALUE;
} else if (Module.NONE != module && null != module) {
tracers.moduleMask |= 1 << Module.BASE.ordinal();
tracers.moduleMask |= 1 << module.ordinal();
}

if (Mode.TIMING == mode) {
tracers.modeMask = Integer.MAX_VALUE;
} else if (Mode.NONE != mode && null != mode) {
tracers.modeMask |= 1 << mode.ordinal();
}
}

public static void close() {
THREAD_LOCAL.remove();
}

private static Module getTraceModule(String str) {
try {
if (str != null) {
return Module.valueOf(str.toUpperCase());
}
} catch (Exception e) {
return Module.NONE;
}
return Module.NONE;
}

public static Timer watchScope(String name) {
Tracers tracers = THREAD_LOCAL.get();
return tracers.tracer(Module.BASE, Mode.TIMER).watchScope(name);
}

public static Timer watchScope(Module module, String name) {
Tracers tracers = THREAD_LOCAL.get();
return tracers.tracer(module, Mode.TIMER).watchScope(name);
}

public static void log(Module module, String log) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(log);
}

public static void log(Module module, String log, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(log, args);
}

// lazy log, use it if you want to avoid construct log string when log is disabled
public static void log(Module module, Function<Object[], String> func, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(func, args);
}

public static void log(String log, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(Module.BASE, Mode.TIMER).log(log, args);
}

public static void record(Module module, String name, String value) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.VARS).record(name, value);
}

public static void count(Module module, String name, int count) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.VARS).count(name, count);
}

public static List<Var<?>> getAllVars() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].getAllVars();
}

public static String printScopeTimer() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printScopeTimer();
}

public static String printTiming() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printTiming();
}

public static String printVars() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printVars();
}

public static String printLogs() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printLogs();
}

public static void toRuntimeProfile(RuntimeProfile profile) {
Tracers tracers = THREAD_LOCAL.get();
tracers.allTracer[1].toRuntimeProfile(profile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private void executeOnce() throws Exception {
curCoordinator.getQueryProfile().getCounterTotalTime()
.setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
curCoordinator.collectProfileSync();
profile.addChild(curCoordinator.buildMergedQueryProfile());
profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile()));

StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ public void collectProfile() {
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import com.starrocks.sql.optimizer.dump.DumpInfo;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -542,6 +543,25 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableBigQueryProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getBigQueryProfileSecondThreshold();
}

public boolean needMergeProfile() {
return isProfileEnabled() &&
sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Loading

0 comments on commit 31977f9

Please sign in to comment.