Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support profile for only big query #33825

Merged
merged 2 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
if (query_options.__isset.enable_profile && query_options.enable_profile) {
_query_ctx->set_enable_profile();
}
if (query_options.__isset.big_query_profile_second_threshold) {
kangkaisen marked this conversation as resolved.
Show resolved Hide resolved
_query_ctx->set_big_query_profile_threshold(query_options.big_query_profile_second_threshold);
}
if (query_options.__isset.pipeline_profile_level) {
_query_ctx->set_profile_level(query_options.pipeline_profile_level);
}
Expand Down
14 changes: 13 additions & 1 deletion be/src/exec/pipeline/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
duration_cast<milliseconds>(steady_clock::now().time_since_epoch() + _query_expire_seconds).count();
}
void set_enable_profile() { _enable_profile = true; }
bool enable_profile() { return _enable_profile; }
bool enable_profile() {
if (_enable_profile) {
return true;
}
if (_big_query_profile_threshold_ns <= 0) {
return false;
}
return MonotonicNanos() - _query_begin_time > _big_query_profile_threshold_ns;
}
void set_big_query_profile_threshold(int64_t big_query_profile_threshold_s) {
_big_query_profile_threshold_ns = 1'000'000'000L * big_query_profile_threshold_s;
}
void set_runtime_profile_report_interval(int64_t runtime_profile_report_interval_s) {
_runtime_profile_report_interval_ns = 1'000'000'000L * runtime_profile_report_interval_s;
}
Expand Down Expand Up @@ -202,6 +213,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::once_flag _init_mem_tracker_once;
std::shared_ptr<RuntimeProfile> _profile;
bool _enable_profile = false;
int64_t _big_query_profile_threshold_ns = 0;
int64_t _runtime_profile_report_interval_ns = std::numeric_limits<int64_t>::max();
TPipelineProfileLevel::type _profile_level;
std::shared_ptr<MemTracker> _mem_tracker;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void register() {

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile = context.getSessionVariable().isEnableProfile();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void executeOnce() throws Exception {
curCoordinator.getQueryProfile().getCounterTotalTime()
.setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
curCoordinator.collectProfileSync();
profile.addChild(curCoordinator.buildMergedQueryProfile());
profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile()));

StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ public void collectProfile() {
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.starrocks.sql.optimizer.dump.DumpInfo;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -557,6 +558,25 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableBigQueryProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getBigQueryProfileSecondThreshold();
}

public boolean needMergeProfile() {
return isProfileEnabled() &&
sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,9 @@ public void cancel(PPlanFragmentCancelReason reason, String message) {
cancelInternal(reason);
} finally {
try {
// when enable_profile is true, it disable count down profileDoneSignal for collect all backend's profile
// Disable count down profileDoneSignal for collect all backend's profile
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
if (connectContext.getSessionVariable().isEnableProfile() &&
message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
queryProfile.finishAllInstances(Status.OK);
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
DebugUtil.printId(jobSpec.getQueryId()));
Expand All @@ -813,7 +812,7 @@ private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
cancelRemoteFragmentsAsync(cancelReason);
if (cancelReason != PPlanFragmentCancelReason.LIMIT_REACH) {
// count down to zero to notify all objects waiting for this
if (!connectContext.getSessionVariable().isEnableProfile()) {
if (!connectContext.isProfileEnabled()) {
queryProfile.finishAllInstances(Status.OK);
}
}
Expand Down Expand Up @@ -1021,8 +1020,8 @@ public boolean join(int timeoutS) {
}

@Override
public RuntimeProfile buildMergedQueryProfile() {
return queryProfile.buildMergedQueryProfile();
public RuntimeProfile buildQueryProfile(boolean needMerge) {
return queryProfile.buildQueryProfile(needMerge);
}

/**
Expand Down
13 changes: 13 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,7 @@ public static MaterializedViewRewriteMode parse(String str) {
public static final String BIG_QUERY_LOG_CPU_SECOND_THRESHOLD = "big_query_log_cpu_second_threshold";
public static final String BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD = "big_query_log_scan_bytes_threshold";
public static final String BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD = "big_query_log_scan_rows_threshold";
public static final String BIG_QUERY_PROFILE_SECOND_THRESHOLD = "big_query_profile_second_threshold";

public static final String SQL_DIALECT = "sql_dialect";

Expand Down Expand Up @@ -847,6 +848,9 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_ASYNC_PROFILE, flag = VariableMgr.INVISIBLE)
private boolean enableAsyncProfile = true;

@VariableMgr.VarAttr(name = BIG_QUERY_PROFILE_SECOND_THRESHOLD)
private int bigQueryProfileSecondThreshold = 0;

@VariableMgr.VarAttr(name = RESOURCE_GROUP_ID, alias = RESOURCE_GROUP_ID_V2,
show = RESOURCE_GROUP_ID_V2, flag = VariableMgr.INVISIBLE)
private int resourceGroupId = 0;
Expand Down Expand Up @@ -1653,6 +1657,14 @@ public void setEnableLoadProfile(boolean enableLoadProfile) {
this.enableLoadProfile = enableLoadProfile;
}

public boolean isEnableBigQueryProfile() {
return bigQueryProfileSecondThreshold > 0;
}

public int getBigQueryProfileSecondThreshold() {
return bigQueryProfileSecondThreshold;
}

public int getWaitTimeoutS() {
return waitTimeout;
}
Expand Down Expand Up @@ -2778,6 +2790,7 @@ public TQueryOptions toThrift() {
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));
tResult.setQuery_delivery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryDeliveryTimeoutS));
tResult.setEnable_profile(enableProfile);
tResult.setBig_query_profile_second_threshold(bigQueryProfileSecondThreshold);
tResult.setRuntime_profile_report_interval(runtimeProfileReportInterval);
tResult.setBatch_size(chunkSize);
tResult.setLoad_mem_limit(loadMemLimit);
Expand Down
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ public void execute() throws Exception {
}
} finally {
boolean isAsync = false;
if (!needRetry && context.getSessionVariable().isEnableProfile()) {
if (!needRetry && context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down Expand Up @@ -790,6 +790,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
long startTime = context.getStartTime();
TUniqueId executionId = context.getExecutionId();
QueryDetail queryDetail = context.getQueryDetail();
boolean needMerge = context.needMergeProfile();

// DO NOT use context int the async task, because the context is shared among consecutive queries.
// profile of query1 maybe executed when query2 is under execution.
Expand All @@ -798,7 +799,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
summaryProfile.addInfoString(ProfileManager.PROFILE_COLLECT_TIME,
DebugUtil.getPrettyStringMs(System.currentTimeMillis() - profileCollectStartTime));
summaryProfile.addInfoString("IsProfileAsync", String.valueOf(isAsync));
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(needMerge));

// Update TotalTime to include the Profile Collect Time and the time to build the profile.
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -1692,7 +1693,7 @@ public void handleDMLStmtWithProfile(ExecPlan execPlan, DmlStmt stmt) throws Exc
throw t;
} finally {
boolean isAsync = false;
if (context.getSessionVariable().isEnableProfile()) {
if (context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void cancel(String cancelledMessage) {

public abstract void setExecPlan(ExecPlan execPlan);

public abstract RuntimeProfile buildMergedQueryProfile();
public abstract RuntimeProfile buildQueryProfile(boolean needMerge);

public abstract RuntimeProfile getQueryProfile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TSinkCommitInfo;
import com.starrocks.thrift.TTabletCommitInfo;
Expand Down Expand Up @@ -267,15 +266,15 @@ public void updateProfile(FragmentInstanceExecState execState, TReportExecStatus
long now = System.currentTimeMillis();
long lastTime = lastRuntimeProfileUpdateTime.get();
if (topProfileSupplier != null && execPlan != null && connectContext != null &&
connectContext.getSessionVariable().isEnableProfile() &&
connectContext.isProfileEnabled() &&
// If it's the last done report, avoiding duplicate trigger
(!execState.isFinished() || profileDoneSignal.getLeftMarks().size() > 1) &&
// Interval * 0.95 * 1000 to allow a certain range of deviation
now - lastTime > (connectContext.getSessionVariable().getRuntimeProfileReportInterval() * 950L) &&
lastRuntimeProfileUpdateTime.compareAndSet(lastTime, now)) {
RuntimeProfile profile = topProfileSupplier.get();
ExecPlan plan = execPlan;
profile.addChild(buildMergedQueryProfile());
profile.addChild(buildQueryProfile(connectContext.needMergeProfile()));
ProfilingExecPlan profilingPlan = plan == null ? null : plan.getProfilingPlan();
ProfileManager.getInstance().pushProfile(profilingPlan, profile);
}
Expand Down Expand Up @@ -312,20 +311,8 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}
}

public RuntimeProfile buildMergedQueryProfile() {
SessionVariable sessionVariable = connectContext.getSessionVariable();

if (!sessionVariable.isEnableProfile()) {
return queryProfile;
}

if (!jobSpec.isEnablePipeline()) {
return queryProfile;
}

int profileLevel = sessionVariable.getPipelineProfileLevel();
if (profileLevel >= TPipelineProfileLevel.DETAIL.getValue()) {
// We don't guarantee the detail level profile can work well with visualization feature.
public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
return queryProfile;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public static JobSpec fromQuerySpec(ConnectContext context,
.descTable(descTable)
.enableStreamPipeline(false)
.isBlockQuery(false)
.needReport(context.getSessionVariable().isEnableProfile())
.needReport(context.getSessionVariable().isEnableProfile() ||
context.getSessionVariable().isEnableBigQueryProfile())
.queryGlobals(queryGlobals)
.queryOptions(queryOptions)
.commonProperties(context)
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ struct TQueryOptions {
106: optional bool enable_agg_spill_preaggregation;
107: optional i64 global_runtime_filter_build_max_size;
108: optional i64 runtime_filter_rpc_http_min_size;

109: optional i64 big_query_profile_second_threshold;
}


Expand Down
Loading