Skip to content

Commit

Permalink
[Enhancement] Support profile for only big query (#33825)
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
(cherry picked from commit 0e2d056)

# Conflicts:
#	gensrc/thrift/InternalService.thrift
  • Loading branch information
liuyehcf authored and wanpengfei-git committed Oct 30, 2023
1 parent 1e2c004 commit b539a5d
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 32 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_enable_profile();
}
if (query_options.__isset.big_query_profile_second_threshold) {
_query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold);
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() { return _enable_profile; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (_big_query_profile_threshold_ns <= 0) {
return false;
}
return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns;
}
void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) {
_big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s;
}
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
Expand Down Expand Up @@ -202,6 +213,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
bool _enable_profile = false;
int64_t _big_query_profile_threshold_ns = 0;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void register() {

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile = context.getSessionVariable().isEnableProfile();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private void executeOnce() throws Exception {
curCoordinator.getQueryProfile().getCounterTotalTime()
.setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
curCoordinator.collectProfileSync();
profile.addChild(curCoordinator.buildMergedQueryProfile());
profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile()));

StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ public void collectProfile() {
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.starrocks.sql.optimizer.dump.DumpInfo;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -557,6 +558,25 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableBigQueryProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getBigQueryProfileSecondThreshold();
}

public boolean needMergeProfile() {
return isProfileEnabled() &&
sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,10 +780,9 @@ public void cancel(PPlanFragmentCancelReason reason, String message) {
cancelInternal(reason);
} finally {
try {
// when enable_profile is true, it disable count down profileDoneSignal for collect all backend's profile
// Disable count down profileDoneSignal for collect all backend's profile
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
if (connectContext.getSessionVariable().isEnableProfile() &&
message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
queryProfile.finishAllInstances(Status.OK);
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
DebugUtil.printId(jobSpec.getQueryId()));
Expand All @@ -805,7 +804,7 @@ private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
cancelRemoteFragmentsAsync(cancelReason);
if (cancelReason != PPlanFragmentCancelReason.LIMIT_REACH) {
// count down to zero to notify all objects waiting for this
if (!connectContext.getSessionVariable().isEnableProfile()) {
if (!connectContext.isProfileEnabled()) {
queryProfile.finishAllInstances(Status.OK);
}
}
Expand Down Expand Up @@ -1013,8 +1012,8 @@ public boolean join(int timeoutS) {
}

@Override
public RuntimeProfile buildMergedQueryProfile() {
return queryProfile.buildMergedQueryProfile();
public RuntimeProfile buildQueryProfile(boolean needMerge) {
return queryProfile.buildQueryProfile(needMerge);
}

/**
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ public static MaterializedViewRewriteMode parse(String str) {
public static final String BIG_QUERY_LOG_CPU_SECOND_THRESHOLD = "big_query_log_cpu_second_threshold";
public static final String BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD = "big_query_log_scan_bytes_threshold";
public static final String BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD = "big_query_log_scan_rows_threshold";
public static final String BIG_QUERY_PROFILE_SECOND_THRESHOLD = "big_query_profile_second_threshold";

public static final String SQL_DIALECT = "sql_dialect";

Expand Down Expand Up @@ -830,6 +831,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_ASYNC_PROFILE, flag = VariableMgr.INVISIBLE)
private boolean enableAsyncProfile = true;

@VariableMgr.VarAttr(name = BIG_QUERY_PROFILE_SECOND_THRESHOLD)
private int bigQueryProfileSecondThreshold = 0;

@VariableMgr.VarAttr(name = RESOURCE_GROUP_ID, alias = RESOURCE_GROUP_ID_V2,
show = RESOURCE_GROUP_ID_V2, flag = VariableMgr.INVISIBLE)
private int resourceGroupId = 0;
Expand Down Expand Up @@ -1604,6 +1608,14 @@ public void setEnableLoadProfile(boolean enableLoadProfile) {
this.enableLoadProfile = enableLoadProfile;
}

public boolean isEnableBigQueryProfile() {
return bigQueryProfileSecondThreshold > 0;
}

public int getBigQueryProfileSecondThreshold() {
return bigQueryProfileSecondThreshold;
}

public int getWaitTimeoutS() {
return waitTimeout;
}
Expand Down Expand Up @@ -2688,6 +2700,7 @@ public TQueryOptions toThrift() {
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));
tResult.setQuery_delivery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryDeliveryTimeoutS));
tResult.setEnable_profile(enableProfile);
tResult.setBig_query_profile_second_threshold(bigQueryProfileSecondThreshold);
tResult.setRuntime_profile_report_interval(runtimeProfileReportInterval);
tResult.setBatch_size(chunkSize);
tResult.setLoad_mem_limit(loadMemLimit);
Expand Down
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ public void execute() throws Exception {
}
} finally {
boolean isAsync = false;
if (!needRetry && context.getSessionVariable().isEnableProfile()) {
if (!needRetry && context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down Expand Up @@ -788,6 +788,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
long startTime = context.getStartTime();
TUniqueId executionId = context.getExecutionId();
QueryDetail queryDetail = context.getQueryDetail();
boolean needMerge = context.needMergeProfile();

// DO NOT use context int the async task, because the context is shared among consecutive queries.
// profile of query1 maybe executed when query2 is under execution.
Expand All @@ -796,7 +797,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
summaryProfile.addInfoString(ProfileManager.PROFILE_COLLECT_TIME,
DebugUtil.getPrettyStringMs(System.currentTimeMillis() - profileCollectStartTime));
summaryProfile.addInfoString("IsProfileAsync", String.valueOf(isAsync));
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(needMerge));

// Update TotalTime to include the Profile Collect Time and the time to build the profile.
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -1664,7 +1665,7 @@ public void handleDMLStmtWithProfile(ExecPlan execPlan, DmlStmt stmt) throws Exc
throw t;
} finally {
boolean isAsync = false;
if (context.getSessionVariable().isEnableProfile()) {
if (context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void cancel(String cancelledMessage) {

public abstract void setExecPlan(ExecPlan execPlan);

public abstract RuntimeProfile buildMergedQueryProfile();
public abstract RuntimeProfile buildQueryProfile(boolean needMerge);

public abstract RuntimeProfile getQueryProfile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TSinkCommitInfo;
import com.starrocks.thrift.TTabletCommitInfo;
Expand Down Expand Up @@ -267,15 +266,15 @@ public void updateProfile(FragmentInstanceExecState execState, TReportExecStatus
long now = System.currentTimeMillis();
long lastTime = lastRuntimeProfileUpdateTime.get();
if (topProfileSupplier != null && execPlan != null && connectContext != null &&
connectContext.getSessionVariable().isEnableProfile() &&
connectContext.isProfileEnabled() &&
// If it's the last done report, avoiding duplicate trigger
(!execState.isFinished() || profileDoneSignal.getLeftMarks().size() > 1) &&
// Interval * 0.95 * 1000 to allow a certain range of deviation
now - lastTime > (connectContext.getSessionVariable().getRuntimeProfileReportInterval() * 950L) &&
lastRuntimeProfileUpdateTime.compareAndSet(lastTime, now)) {
RuntimeProfile profile = topProfileSupplier.get();
ExecPlan plan = execPlan;
profile.addChild(buildMergedQueryProfile());
profile.addChild(buildQueryProfile(connectContext.needMergeProfile()));
ProfilingExecPlan profilingPlan = plan == null ? null : plan.getProfilingPlan();
ProfileManager.getInstance().pushProfile(profilingPlan, profile);
}
Expand Down Expand Up @@ -312,20 +311,8 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}
}

public RuntimeProfile buildMergedQueryProfile() {
SessionVariable sessionVariable = connectContext.getSessionVariable();

if (!sessionVariable.isEnableProfile()) {
return queryProfile;
}

if (!jobSpec.isEnablePipeline()) {
return queryProfile;
}

int profileLevel = sessionVariable.getPipelineProfileLevel();
if (profileLevel >= TPipelineProfileLevel.DETAIL.getValue()) {
// We don't guarantee the detail level profile can work well with visualization feature.
public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
return queryProfile;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public static JobSpec fromQuerySpec(ConnectContext context,
.descTable(descTable)
.enableStreamPipeline(false)
.isBlockQuery(false)
.needReport(context.getSessionVariable().isEnableProfile())
.needReport(context.getSessionVariable().isEnableProfile() ||
context.getSessionVariable().isEnableBigQueryProfile())
.queryGlobals(queryGlobals)
.queryOptions(queryOptions)
.commonProperties(context)
Expand Down
9 changes: 9 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ struct TQueryOptions {

104: optional TOverflowMode overflow_mode = TOverflowMode.OUTPUT_NULL;
105: optional bool use_column_pool = true;
<<<<<<< HEAD
=======

106: optional bool enable_agg_spill_preaggregation;
107: optional i64 global_runtime_filter_build_max_size;
108: optional i64 runtime_filter_rpc_http_min_size;

109: optional i64 big_query_profile_second_threshold;
>>>>>>> 0e2d0569a4 ([Enhancement] Support profile for only big query (#33825))
}


Expand Down

0 comments on commit b539a5d

Please sign in to comment.