Skip to content

Commit

Permalink
[Enhancement] Support profile for only big query (#33825)
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
(cherry picked from commit 0e2d056)

# Conflicts:
#	be/src/exec/pipeline/query_context.h
#	fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java
#	fe/fe-core/src/main/java/com/starrocks/load/loadv2/LoadLoadingTask.java
#	fe/fe-core/src/main/java/com/starrocks/load/streamload/StreamLoadTask.java
#	fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
#	fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
#	fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/QueryRuntimeProfile.java
#	fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java
#	gensrc/thrift/InternalService.thrift
  • Loading branch information
liuyehcf authored and mergify[bot] committed Oct 31, 2023
1 parent d766a96 commit 998ecbc
Show file tree
Hide file tree
Showing 13 changed files with 2,885 additions and 0 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_report_profile();
}
if (query_options.__isset.big_query_profile_second_threshold) {
_query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold);
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
26 changes: 26 additions & 0 deletions be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,28 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
_query_deadline =
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
<<<<<<< HEAD
void set_report_profile() { _is_report_profile = true; }
bool is_report_profile() { return _is_report_profile; }
=======
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (_big_query_profile_threshold_ns <= 0) {
return false;
}
return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns;
}
void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) {
_big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s;
}
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
int64_t get_runtime_profile_report_interval_ns() { return _runtime_profile_report_interval_ns; }
>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))
void set_profile_level(const TPipelineProfileLevel::type& profile_level) { _profile_level = profile_level; }
const TPipelineProfileLevel::type& profile_level() { return _profile_level; }

Expand Down Expand Up @@ -173,7 +193,13 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
bool _is_runtime_filter_coordinator = false;
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
<<<<<<< HEAD
bool _is_report_profile = false;
=======
bool _enable_profile = false;
int64_t _big_query_profile_threshold_ns = 0;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool _object_pool;
Expand Down
198 changes: 198 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/profile/Tracers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.common.profile;

import com.google.common.base.Stopwatch;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.qe.ConnectContext;
import org.apache.commons.lang3.StringUtils;

import java.util.List;
import java.util.function.Function;

public class Tracers {
public enum Mode {
NONE, LOGS, VARS, TIMER, TIMING,
}

public enum Module {
NONE, ALL, BASE, OPTIMIZER, SCHEDULER, ANALYZE, MV, EXTERNAL,
}

private static final Tracer EMPTY_TRACER = new Tracer() {
};

private static final ThreadLocal<Tracers> THREAD_LOCAL = ThreadLocal.withInitial(Tracers::new);

// [empty tracer, real tracer]
private final Tracer[] allTracer = new Tracer[] {EMPTY_TRACER, EMPTY_TRACER};

// mark enable module
private int moduleMask = 0;

// mark enable mode
private int modeMask = 0;

private boolean isCommandLog = false;

private Tracer tracer(Module module, Mode mode) {
// need return real tracer when mode && module enable
// enable mode is `modeMask |= 1 << mode.ordinal()`, check mode is `(modeMask >> mode.ordinal()) & 1`, so
// when enable mode will return allTracer[1], disable will return allTracer[0]
return allTracer[(modeMask >> mode.ordinal()) & (moduleMask >> module.ordinal() & 1)];
}

public static void register(ConnectContext context) {
Tracers tracers = THREAD_LOCAL.get();
tracers.isCommandLog = StringUtils.equalsIgnoreCase("command", context.getSessionVariable().getTraceLogMode());
LogTracer logTracer = tracers.isCommandLog ? new CommandLogTracer() : new FileLogTracer();
tracers.allTracer[0] = EMPTY_TRACER;
tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer);
}

public static void register() {
// default register FileLogTracer
Tracers tracers = THREAD_LOCAL.get();
LogTracer logTracer = new FileLogTracer();
tracers.allTracer[0] = EMPTY_TRACER;
tracers.allTracer[1] = new TracerImpl(Stopwatch.createStarted(), new TimeWatcher(), new VarTracer(), logTracer);
}

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
if (Module.NONE == module || null == module) {
tracers.moduleMask = 0;
}
if (Mode.NONE == mode || null == mode) {
tracers.modeMask = 0;
}
if (enableProfile) {
tracers.moduleMask |= 1 << Module.BASE.ordinal();
tracers.moduleMask |= 1 << Module.EXTERNAL.ordinal();
tracers.moduleMask |= 1 << Module.SCHEDULER.ordinal();

tracers.modeMask |= 1 << Mode.TIMER.ordinal();
tracers.modeMask |= 1 << Mode.VARS.ordinal();
}
if (checkMV) {
tracers.moduleMask |= 1 << Module.MV.ordinal();

tracers.modeMask |= 1 << Mode.VARS.ordinal();
}
if (Module.ALL == module) {
tracers.moduleMask = Integer.MAX_VALUE;
} else if (Module.NONE != module && null != module) {
tracers.moduleMask |= 1 << Module.BASE.ordinal();
tracers.moduleMask |= 1 << module.ordinal();
}

if (Mode.TIMING == mode) {
tracers.modeMask = Integer.MAX_VALUE;
} else if (Mode.NONE != mode && null != mode) {
tracers.modeMask |= 1 << mode.ordinal();
}
}

public static void close() {
THREAD_LOCAL.remove();
}

private static Module getTraceModule(String str) {
try {
if (str != null) {
return Module.valueOf(str.toUpperCase());
}
} catch (Exception e) {
return Module.NONE;
}
return Module.NONE;
}

public static Timer watchScope(String name) {
Tracers tracers = THREAD_LOCAL.get();
return tracers.tracer(Module.BASE, Mode.TIMER).watchScope(name);
}

public static Timer watchScope(Module module, String name) {
Tracers tracers = THREAD_LOCAL.get();
return tracers.tracer(module, Mode.TIMER).watchScope(name);
}

public static void log(Module module, String log) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(log);
}

public static void log(Module module, String log, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(log, args);
}

// lazy log, use it if you want to avoid construct log string when log is disabled
public static void log(Module module, Function<Object[], String> func, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.LOGS).log(func, args);
}

public static void log(String log, Object... args) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(Module.BASE, Mode.TIMER).log(log, args);
}

public static void record(Module module, String name, String value) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.VARS).record(name, value);
}

public static void count(Module module, String name, int count) {
Tracers tracers = THREAD_LOCAL.get();
tracers.tracer(module, Mode.VARS).count(name, count);
}

public static List<Var<?>> getAllVars() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].getAllVars();
}

public static String printScopeTimer() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printScopeTimer();
}

public static String printTiming() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printTiming();
}

public static String printVars() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printVars();
}

public static String printLogs() {
Tracers tracers = THREAD_LOCAL.get();
return tracers.allTracer[1].printLogs();
}

public static void toRuntimeProfile(RuntimeProfile profile) {
Tracers tracers = THREAD_LOCAL.get();
tracers.allTracer[1].toRuntimeProfile(profile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,14 @@ private void executeOnce() throws Exception {

curCoordinator.getQueryProfile().getCounterTotalTime()
.setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
<<<<<<< HEAD
curCoordinator.endProfile();
curCoordinator.mergeIsomorphicProfiles();
profile.addChild(curCoordinator.getQueryProfile());
=======
curCoordinator.collectProfileSync();
profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile()));
>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))

StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,74 @@ public void afterCommitted(TransactionState txnState, boolean txnOperated) throw
}
}

<<<<<<< HEAD
=======
public void collectProfile() {
long currentTimestamp = System.currentTimeMillis();
long totalTimeMs = currentTimestamp - createTimeMs;

// For the usage scenarios of flink cdc or routine load,
// the frequency of stream load maybe very high, resulting in many profiles,
// but we may only care about the long-duration stream load profile.
if (totalTimeMs < Config.stream_load_profile_collect_second * 1000) {
LOG.info(String.format("Load %s, totalTimeMs %d < Config.stream_load_profile_collect_second %d)",
label, totalTimeMs, Config.stream_load_profile_collect_second));
return;
}

RuntimeProfile profile = new RuntimeProfile("Load");
RuntimeProfile summaryProfile = new RuntimeProfile("Summary");
summaryProfile.addInfoString(ProfileManager.QUERY_ID, DebugUtil.printId(loadId));
summaryProfile.addInfoString(ProfileManager.START_TIME,
TimeUtils.longToTimeString(createTimeMs));

summaryProfile.addInfoString(ProfileManager.END_TIME, TimeUtils.longToTimeString(System.currentTimeMillis()));
summaryProfile.addInfoString(ProfileManager.TOTAL_TIME, DebugUtil.getPrettyStringMs(totalTimeMs));

summaryProfile.addInfoString(ProfileManager.QUERY_TYPE, "Load");
summaryProfile.addInfoString("StarRocks Version",
String.format("%s-%s", Version.STARROCKS_VERSION, Version.STARROCKS_COMMIT_HASH));
summaryProfile.addInfoString(ProfileManager.DEFAULT_DB, dbName);

Map<String, String> loadCounters = coord.getLoadCounters();
if (loadCounters != null && loadCounters.size() != 0) {
summaryProfile.addInfoString("NumRowsNormal", loadCounters.get(LoadEtlTask.DPP_NORMAL_ALL));
summaryProfile.addInfoString("NumLoadBytesTotal", loadCounters.get(LoadJob.LOADED_BYTES));
summaryProfile.addInfoString("NumRowsAbnormal", loadCounters.get(LoadEtlTask.DPP_ABNORMAL_ALL));
summaryProfile.addInfoString("numRowsUnselected", loadCounters.get(LoadJob.UNSELECTED_ROWS));
}
ConnectContext session = ConnectContext.get();
if (session != null) {
SessionVariable variables = session.getSessionVariable();
if (variables != null) {
summaryProfile.addInfoString("NonDefaultSessionVariables", variables.getNonDefaultVariablesJson());
}
}

profile.addChild(summaryProfile);
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
} else {
profile.addChild(coord.getQueryProfile());
}
}

ProfileManager.getInstance().pushLoadProfile(profile);
}

public void setLoadState(long loadBytes, long loadRows, long filteredRows, long unselectedRows,
String errorLogUrl, String errorMsg) {
this.numRowsNormal = loadRows;
this.numRowsAbnormal = filteredRows;
this.numRowsUnselected = unselectedRows;
this.numLoadBytesTotal = loadBytes;
this.trackingUrl = errorLogUrl;
this.errorMsg = errorMsg;
}

>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))
@Override
public void replayOnCommitted(TransactionState txnState) {
writeLock();
Expand Down
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@
import com.starrocks.sql.ast.UserVariable;
import com.starrocks.sql.optimizer.dump.DumpInfo;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
<<<<<<< HEAD
=======
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TPipelineProfileLevel;
>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -505,6 +510,25 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableBigQueryProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getBigQueryProfileSecondThreshold();
}

public boolean needMergeProfile() {
return isProfileEnabled() &&
sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Loading

0 comments on commit 998ecbc

Please sign in to comment.