Skip to content

Commit

Permalink
[Enhancement] Support profile only for long-running profile
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
  • Loading branch information
liuyehcf committed Oct 27, 2023
1 parent 5a5f900 commit aed6a99
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 10 deletions.
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_enable_profile();
}
if (query_options.__isset.enable_runtime_profile && query_options.enable_runtime_profile) {
_query_ctx->set_enable_runtime_profile();
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
12 changes: 11 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,16 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() { return _enable_profile; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (!_enable_runtime_profile) {
return false;
}
return (MonotonicNanos() - _query_begin_time) > _runtime_profile_report_interval_ns;
}
void set_enable_runtime_profile() { _enable_runtime_profile = true; }
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
Expand Down Expand Up @@ -202,6 +211,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
bool _enable_profile = false;
bool _enable_runtime_profile = false;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void register() {

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile = context.getSessionVariable().isEnableProfile();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableRuntimeProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
Expand Down
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,20 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableRuntimeProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getRuntimeProfileReportInterval();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,9 @@ public void cancel(PPlanFragmentCancelReason reason, String message) {
cancelInternal(reason);
} finally {
try {
// when enable_profile is true, it disable count down profileDoneSignal for collect all backend's profile
// Disable count down profileDoneSignal for collect all backend's profile
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
if (connectContext.getSessionVariable().isEnableProfile() &&
message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
queryProfile.finishAllInstances(Status.OK);
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
DebugUtil.printId(jobSpec.getQueryId()));
Expand All @@ -813,7 +812,7 @@ private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
cancelRemoteFragmentsAsync(cancelReason);
if (cancelReason != PPlanFragmentCancelReason.LIMIT_REACH) {
// count down to zero to notify all objects waiting for this
if (!connectContext.getSessionVariable().isEnableProfile()) {
if (!connectContext.isProfileEnabled()) {
queryProfile.finishAllInstances(Status.OK);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String PROFILE_LIMIT_FOLD = "profile_limit_fold";
public static final String PIPELINE_PROFILE_LEVEL = "pipeline_profile_level";
public static final String ENABLE_ASYNC_PROFILE = "enable_async_profile";
public static final String ENABLE_RUNTIME_PROFILE = "enable_runtime_profile";

public static final String RESOURCE_GROUP_ID = "workgroup_id";
public static final String RESOURCE_GROUP_ID_V2 = "resource_group_id";
Expand Down Expand Up @@ -846,6 +847,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_ASYNC_PROFILE, flag = VariableMgr.INVISIBLE)
private boolean enableAsyncProfile = true;

@VariableMgr.VarAttr(name = ENABLE_RUNTIME_PROFILE)
private boolean enableRuntimeProfile = false;

@VariableMgr.VarAttr(name = RESOURCE_GROUP_ID, alias = RESOURCE_GROUP_ID_V2,
show = RESOURCE_GROUP_ID_V2, flag = VariableMgr.INVISIBLE)
private int resourceGroupId = 0;
Expand Down Expand Up @@ -1650,6 +1654,10 @@ public void setEnableLoadProfile(boolean enableLoadProfile) {
this.enableLoadProfile = enableLoadProfile;
}

public boolean isEnableRuntimeProfile() {
return enableRuntimeProfile;
}

public int getWaitTimeoutS() {
return waitTimeout;
}
Expand Down Expand Up @@ -2775,6 +2783,7 @@ public TQueryOptions toThrift() {
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));
tResult.setQuery_delivery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryDeliveryTimeoutS));
tResult.setEnable_profile(enableProfile);
tResult.setEnable_runtime_profile(enableRuntimeProfile);
tResult.setRuntime_profile_report_interval(runtimeProfileReportInterval);
tResult.setBatch_size(chunkSize);
tResult.setLoad_mem_limit(loadMemLimit);
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ public void execute() throws Exception {
}
} finally {
boolean isAsync = false;
if (!needRetry && context.getSessionVariable().isEnableProfile()) {
if (!needRetry && context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down Expand Up @@ -1692,7 +1692,7 @@ public void handleDMLStmtWithProfile(ExecPlan execPlan, DmlStmt stmt) throws Exc
throw t;
} finally {
boolean isAsync = false;
if (context.getSessionVariable().isEnableProfile()) {
if (context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public void updateProfile(FragmentInstanceExecState execState, TReportExecStatus
long now = System.currentTimeMillis();
long lastTime = lastRuntimeProfileUpdateTime.get();
if (topProfileSupplier != null && execPlan != null && connectContext != null &&
connectContext.getSessionVariable().isEnableProfile() &&
connectContext.isProfileEnabled() &&
// If it's the last done report, avoiding duplicate trigger
(!execState.isFinished() || profileDoneSignal.getLeftMarks().size() > 1) &&
// Interval * 0.95 * 1000 to allow a certain range of deviation
Expand Down Expand Up @@ -315,7 +315,7 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
public RuntimeProfile buildMergedQueryProfile() {
SessionVariable sessionVariable = connectContext.getSessionVariable();

if (!sessionVariable.isEnableProfile()) {
if (!connectContext.isProfileEnabled()) {
return queryProfile;
}

Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ struct TQueryOptions {

106: optional bool enable_agg_spill_preaggregation;
107: optional i64 global_runtime_filter_build_max_size;

108: optional bool enable_runtime_profile = false;
}


Expand Down

0 comments on commit aed6a99

Please sign in to comment.