Skip to content

Commit

Permalink
[Enhancement] Support profile for only big query
Browse files Browse the repository at this point in the history
Signed-off-by: liuyehcf <[email protected]>
  • Loading branch information
liuyehcf committed Oct 30, 2023
1 parent 5a5f900 commit 8057ffc
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public static void register() {

public static void init(ConnectContext context, Mode mode, String moduleStr) {
Tracers tracers = THREAD_LOCAL.get();
boolean enableProfile = context.getSessionVariable().isEnableProfile();
boolean enableProfile =
context.getSessionVariable().isEnableProfile() || context.getSessionVariable().isEnableBigQueryProfile();
boolean checkMV = context.getSessionVariable().isEnableMaterializedViewRewriteOrError();

Module module = getTraceModule(moduleStr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ private void executeOnce() throws Exception {
curCoordinator.getQueryProfile().getCounterTotalTime()
.setValue(TimeUtils.getEstimatedTime(beginTimeInNanoSecond));
curCoordinator.collectProfileSync();
profile.addChild(curCoordinator.buildMergedQueryProfile());
profile.addChild(curCoordinator.buildQueryProfile(context.needMergeProfile()));

StringBuilder builder = new StringBuilder();
profile.prettyPrint(builder, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ public void collectProfile() {
if (coord.getQueryProfile() != null) {
if (!isSyncStreamLoad()) {
coord.collectProfileSync();
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(session == null || session.needMergeProfile()));
} else {
profile.addChild(coord.getQueryProfile());
}
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import com.starrocks.sql.optimizer.dump.DumpInfo;
import com.starrocks.sql.optimizer.dump.QueryDumpInfo;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TWorkGroup;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -557,6 +558,25 @@ public void setLastQueryId(UUID queryId) {
this.lastQueryId = queryId;
}

public boolean isProfileEnabled() {
if (sessionVariable == null) {
return false;
}
if (sessionVariable.isEnableProfile()) {
return true;
}
if (!sessionVariable.isEnableBigQueryProfile()) {
return false;
}
return System.currentTimeMillis() - getStartTime() >
1000L * sessionVariable.getBigQueryProfileSecondThreshold();
}

public boolean needMergeProfile() {
return isProfileEnabled() &&
sessionVariable.getPipelineProfileLevel() < TPipelineProfileLevel.DETAIL.getValue();
}

public byte[] getAuthDataSalt() {
return authDataSalt;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,10 +788,9 @@ public void cancel(PPlanFragmentCancelReason reason, String message) {
cancelInternal(reason);
} finally {
try {
// when enable_profile is true, it disable count down profileDoneSignal for collect all backend's profile
// Disable count down profileDoneSignal for collect all backend's profile
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
if (connectContext.getSessionVariable().isEnableProfile() &&
message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
queryProfile.finishAllInstances(Status.OK);
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
DebugUtil.printId(jobSpec.getQueryId()));
Expand All @@ -813,7 +812,7 @@ private void cancelInternal(PPlanFragmentCancelReason cancelReason) {
cancelRemoteFragmentsAsync(cancelReason);
if (cancelReason != PPlanFragmentCancelReason.LIMIT_REACH) {
// count down to zero to notify all objects waiting for this
if (!connectContext.getSessionVariable().isEnableProfile()) {
if (!connectContext.isProfileEnabled()) {
queryProfile.finishAllInstances(Status.OK);
}
}
Expand Down Expand Up @@ -1021,8 +1020,8 @@ public boolean join(int timeoutS) {
}

@Override
public RuntimeProfile buildMergedQueryProfile() {
return queryProfile.buildMergedQueryProfile();
public RuntimeProfile buildQueryProfile(boolean needMerge) {
return queryProfile.buildQueryProfile(needMerge);
}

/**
Expand Down
18 changes: 17 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ public static MaterializedViewRewriteMode parse(String str) {
public static final String BIG_QUERY_LOG_CPU_SECOND_THRESHOLD = "big_query_log_cpu_second_threshold";
public static final String BIG_QUERY_LOG_SCAN_BYTES_THRESHOLD = "big_query_log_scan_bytes_threshold";
public static final String BIG_QUERY_LOG_SCAN_ROWS_THRESHOLD = "big_query_log_scan_rows_threshold";
public static final String ENABLE_BIG_QUERY_PROFILE = "enable_big_query_profile";
public static final String BIG_QUERY_PROFILE_SECOND_THRESHOLD = "big_query_profile_second_threshold";

public static final String SQL_DIALECT = "sql_dialect";

Expand Down Expand Up @@ -846,6 +848,12 @@ public static MaterializedViewRewriteMode parse(String str) {
@VariableMgr.VarAttr(name = ENABLE_ASYNC_PROFILE, flag = VariableMgr.INVISIBLE)
private boolean enableAsyncProfile = true;

@VariableMgr.VarAttr(name = ENABLE_BIG_QUERY_PROFILE)
private boolean enableBigQueryProfile = false;

@VariableMgr.VarAttr(name = BIG_QUERY_PROFILE_SECOND_THRESHOLD)
private int bigQueryProfileSecondThreshold = 30;

@VariableMgr.VarAttr(name = RESOURCE_GROUP_ID, alias = RESOURCE_GROUP_ID_V2,
show = RESOURCE_GROUP_ID_V2, flag = VariableMgr.INVISIBLE)
private int resourceGroupId = 0;
Expand Down Expand Up @@ -1650,6 +1658,14 @@ public void setEnableLoadProfile(boolean enableLoadProfile) {
this.enableLoadProfile = enableLoadProfile;
}

public boolean isEnableBigQueryProfile() {
return enableBigQueryProfile;
}

public int getBigQueryProfileSecondThreshold() {
return bigQueryProfileSecondThreshold;
}

public int getWaitTimeoutS() {
return waitTimeout;
}
Expand Down Expand Up @@ -2774,7 +2790,7 @@ public TQueryOptions toThrift() {
// Avoid integer overflow
tResult.setQuery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryTimeoutS));
tResult.setQuery_delivery_timeout(Math.min(Integer.MAX_VALUE / 1000, queryDeliveryTimeoutS));
tResult.setEnable_profile(enableProfile);
tResult.setEnable_profile(enableProfile || enableBigQueryProfile);
tResult.setRuntime_profile_report_interval(runtimeProfileReportInterval);
tResult.setBatch_size(chunkSize);
tResult.setLoad_mem_limit(loadMemLimit);
Expand Down
7 changes: 4 additions & 3 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ public void execute() throws Exception {
}
} finally {
boolean isAsync = false;
if (!needRetry && context.getSessionVariable().isEnableProfile()) {
if (!needRetry && context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down Expand Up @@ -790,6 +790,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
long startTime = context.getStartTime();
TUniqueId executionId = context.getExecutionId();
QueryDetail queryDetail = context.getQueryDetail();
boolean needMerge = context.needMergeProfile();

// DO NOT use context int the async task, because the context is shared among consecutive queries.
// profile of query1 maybe executed when query2 is under execution.
Expand All @@ -798,7 +799,7 @@ private boolean tryProcessProfileAsync(ExecPlan plan) {
summaryProfile.addInfoString(ProfileManager.PROFILE_COLLECT_TIME,
DebugUtil.getPrettyStringMs(System.currentTimeMillis() - profileCollectStartTime));
summaryProfile.addInfoString("IsProfileAsync", String.valueOf(isAsync));
profile.addChild(coord.buildMergedQueryProfile());
profile.addChild(coord.buildQueryProfile(needMerge));

// Update TotalTime to include the Profile Collect Time and the time to build the profile.
long now = System.currentTimeMillis();
Expand Down Expand Up @@ -1692,7 +1693,7 @@ public void handleDMLStmtWithProfile(ExecPlan execPlan, DmlStmt stmt) throws Exc
throw t;
} finally {
boolean isAsync = false;
if (context.getSessionVariable().isEnableProfile()) {
if (context.isProfileEnabled()) {
isAsync = tryProcessProfileAsync(execPlan);
if (parsedStmt.isExplain() &&
StatementBase.ExplainLevel.ANALYZE.equals(parsedStmt.getExplainLevel())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void cancel(String cancelledMessage) {

public abstract void setExecPlan(ExecPlan execPlan);

public abstract RuntimeProfile buildMergedQueryProfile();
public abstract RuntimeProfile buildQueryProfile(boolean needMerge);

public abstract RuntimeProfile getQueryProfile();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TPipelineProfileLevel;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TSinkCommitInfo;
import com.starrocks.thrift.TTabletCommitInfo;
Expand Down Expand Up @@ -267,15 +266,15 @@ public void updateProfile(FragmentInstanceExecState execState, TReportExecStatus
long now = System.currentTimeMillis();
long lastTime = lastRuntimeProfileUpdateTime.get();
if (topProfileSupplier != null && execPlan != null && connectContext != null &&
connectContext.getSessionVariable().isEnableProfile() &&
connectContext.isProfileEnabled() &&
// If it's the last done report, avoiding duplicate trigger
(!execState.isFinished() || profileDoneSignal.getLeftMarks().size() > 1) &&
// Interval * 0.95 * 1000 to allow a certain range of deviation
now - lastTime > (connectContext.getSessionVariable().getRuntimeProfileReportInterval() * 950L) &&
lastRuntimeProfileUpdateTime.compareAndSet(lastTime, now)) {
RuntimeProfile profile = topProfileSupplier.get();
ExecPlan plan = execPlan;
profile.addChild(buildMergedQueryProfile());
profile.addChild(buildQueryProfile(connectContext.needMergeProfile()));
ProfilingExecPlan profilingPlan = plan == null ? null : plan.getProfilingPlan();
ProfileManager.getInstance().pushProfile(profilingPlan, profile);
}
Expand Down Expand Up @@ -312,20 +311,8 @@ public void updateLoadInformation(FragmentInstanceExecState execState, TReportEx
}
}

public RuntimeProfile buildMergedQueryProfile() {
SessionVariable sessionVariable = connectContext.getSessionVariable();

if (!sessionVariable.isEnableProfile()) {
return queryProfile;
}

if (!jobSpec.isEnablePipeline()) {
return queryProfile;
}

int profileLevel = sessionVariable.getPipelineProfileLevel();
if (profileLevel >= TPipelineProfileLevel.DETAIL.getValue()) {
// We don't guarantee the detail level profile can work well with visualization feature.
public RuntimeProfile buildQueryProfile(boolean needMerge) {
if (!needMerge || !jobSpec.isEnablePipeline()) {
return queryProfile;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public static JobSpec fromQuerySpec(ConnectContext context,
.descTable(descTable)
.enableStreamPipeline(false)
.isBlockQuery(false)
.needReport(context.getSessionVariable().isEnableProfile())
.needReport(context.getSessionVariable().isEnableProfile() ||
context.getSessionVariable().isEnableBigQueryProfile())
.queryGlobals(queryGlobals)
.queryOptions(queryOptions)
.commonProperties(context)
Expand Down

0 comments on commit 8057ffc

Please sign in to comment.