Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-17086 Warnings added for long lazy SQL queries #11405

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ public class H2QueryInfo implements TrackableQuery {
/** Begin timestamp. */
private final long beginTs;

/** The most recent point in time when the tracking of a long query was suspended. */
private volatile long lastSuspendTs;

/** External wait time. */
private volatile long extWait;

/** Long query time tracking suspension flag. */
private volatile boolean isSuspended;

/** Query schema. */
private final String schema;

Expand All @@ -65,6 +74,9 @@ public class H2QueryInfo implements TrackableQuery {
/** Query id. */
private final long queryId;

/** Lock object. */
private final Object lock = new Object();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think explicit lock is redundant, synchronized method can be used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected.


/**
* @param type Query type.
* @param stmt Query statement.
Expand Down Expand Up @@ -123,7 +135,29 @@ protected void printInfo(StringBuilder msg) {

/** {@inheritDoc} */
@Override public long time() {
return U.currentTimeMillis() - beginTs;
return (isSuspended ? lastSuspendTs : U.currentTimeMillis()) - beginTs - extWait;
}

/** */
public void suspendTracking() {
synchronized (lock) {
if (!isSuspended) {
isSuspended = true;

lastSuspendTs = U.currentTimeMillis();
}
}
}

/** */
public void resumeTracking() {
synchronized (lock) {
if (isSuspended) {
isSuspended = false;

extWait += U.currentTimeMillis() - lastSuspendTs;
}
}
}

/**
Expand Down Expand Up @@ -156,6 +190,11 @@ protected void printInfo(StringBuilder msg) {
return msgSb.toString();
}

/** */
public boolean isSuspended() {
return isSuspended;
}

/**
* Query type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ public abstract class H2ResultSetIterator<T> extends GridIteratorAdapter<T> impl
/** */
private final H2QueryInfo qryInfo;

/** */
final IgniteH2Indexing h2;

/**
* @param data Data array.
* @param log Logger.
Expand All @@ -141,6 +144,7 @@ protected H2ResultSetIterator(
this.data = data;
this.tracing = tracing;
this.qryInfo = qryInfo;
this.h2 = h2;

try {
res = (ResultInterface)RESULT_FIELD.get(data);
Expand Down Expand Up @@ -325,6 +329,9 @@ public void onClose() throws IgniteCheckedException {

lockTables();

if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, null);

try {
resultSetChecker.checkOnClose();

Expand Down Expand Up @@ -391,7 +398,7 @@ private synchronized void closeInternal() throws IgniteCheckedException {
if (closed)
return false;

return hasRow || (hasRow = fetchNext());
return hasRow || (hasRow = h2.executeWithResumableTimeTracking(this::fetchNext, qryInfo));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
import org.apache.ignite.internal.util.lang.IgniteThrowableSupplier;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
Expand Down Expand Up @@ -412,6 +413,8 @@ private GridQueryFieldsResult executeSelectLocal(
@Override public GridCloseableIterator<List<?>> iterator() throws IgniteCheckedException {
H2PooledConnection conn = connections().connection(qryDesc.schemaName());

H2QueryInfo qryInfo = null;

try (TraceSurroundings ignored = MTC.support(ctx.tracing().create(SQL_ITER_OPEN, MTC.span()))) {
H2Utils.setupConnection(conn, qctx,
qryDesc.distributedJoins(), qryDesc.enforceJoinOrder(), qryParams.lazy());
Expand All @@ -434,9 +437,11 @@ private GridQueryFieldsResult executeSelectLocal(

H2Utils.bindParameters(stmt, F.asList(params));

H2QueryInfo qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
qryInfo = new H2QueryInfo(H2QueryInfo.QueryType.LOCAL, stmt, qry,
ctx.localNodeId(), qryId);

heavyQryTracker.startTracking(qryInfo);

if (ctx.performanceStatistics().enabled()) {
ctx.performanceStatistics().queryProperty(
GridCacheQueryType.SQL_FIELDS,
Expand All @@ -447,13 +452,16 @@ private GridQueryFieldsResult executeSelectLocal(
);
}

ResultSet rs = executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
ResultSet rs = executeWithResumableTimeTracking(
() -> executeSqlQueryWithTimer(
stmt,
conn,
qry,
timeout,
cancel,
qryParams.dataPageScanEnabled(),
null
),
qryInfo
);

Expand All @@ -470,6 +478,9 @@ private GridQueryFieldsResult executeSelectLocal(
catch (IgniteCheckedException | RuntimeException | Error e) {
conn.close();

if (qryInfo != null)
heavyQryTracker.stopTracking(qryInfo, e);

throw e;
}
}
Expand Down Expand Up @@ -2253,4 +2264,25 @@ public HeavyQueriesTracker heavyQueriesTracker() {
public DistributedIndexingConfiguration distributedConfiguration() {
return distrCfg;
}

/**
* Resumes time tracking before the task (if needed) and suspends time tracking after the task is finished.
*
* @param task Query/fetch to execute.
* @param qryInfo Query info.
* @throws IgniteCheckedException If failed.
*/
public <T> T executeWithResumableTimeTracking(
IgniteThrowableSupplier<T> task,
final H2QueryInfo qryInfo
) throws IgniteCheckedException {
qryInfo.resumeTracking();

try {
return task.get();
}
finally {
qryInfo.suspendTracking();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,8 @@ private void onQueryRequest0(

qryResults.addResult(qryIdx, res);

MapH2QueryInfo qryInfo = null;

try {
res.lock();

Expand All @@ -460,7 +462,9 @@ private void onQueryRequest0(

H2Utils.bindParameters(stmt, params0);

MapH2QueryInfo qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);
qryInfo = new MapH2QueryInfo(stmt, qry.query(), node.id(), qryId, reqId, segmentId);

h2.heavyQueriesTracker().startTracking(qryInfo);

if (performanceStatsEnabled) {
ctx.performanceStatistics().queryProperty(
Expand All @@ -472,14 +476,20 @@ private void onQueryRequest0(
);
}

ResultSet rs = h2.executeSqlQueryWithTimer(
stmt,
conn,
sql,
timeout,
qryResults.queryCancel(qryIdx),
dataPageScanEnabled,
qryInfo);
GridQueryCancel qryCancel = qryResults.queryCancel(qryIdx);

ResultSet rs = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
conn,
sql,
timeout,
qryCancel,
dataPageScanEnabled,
null
),
qryInfo
);

if (evt) {
ctx.event().record(new CacheQueryExecutedEvent<>(
Expand Down Expand Up @@ -507,14 +517,21 @@ private void onQueryRequest0(

res.openResult(rs, qryInfo);

final GridQueryNextPageResponse msg = prepareNextPage(
nodeRess,
node,
qryResults,
qryIdx,
segmentId,
pageSize,
dataPageScanEnabled
MapQueryResults qryResults0 = qryResults;

int qryIdx0 = qryIdx;

final GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
() -> prepareNextPage(
nodeRess,
node,
qryResults0,
qryIdx0,
segmentId,
pageSize,
dataPageScanEnabled
),
qryInfo
);

if (msg != null)
Expand All @@ -528,6 +545,12 @@ private void onQueryRequest0(

qryIdx++;
}
catch (Throwable e) {
if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);

throw e;
}
finally {
try {
res.unlockTables();
Expand Down Expand Up @@ -862,14 +885,18 @@ else if (qryResults.cancelled())

Boolean dataPageScanEnabled = isDataPageScanEnabled(req.getFlags());

GridQueryNextPageResponse msg = prepareNextPage(
nodeRess,
node,
qryResults,
req.query(),
req.segmentId(),
req.pageSize(),
dataPageScanEnabled);
GridQueryNextPageResponse msg = h2.executeWithResumableTimeTracking(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking is not stopped in case of exception here

Copy link
Contributor Author

@oleg-vlsk oleg-vlsk Jul 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stopping the time tracking is added to the catch section.

() -> prepareNextPage(
nodeRess,
node,
qryResults,
req.query(),
req.segmentId(),
req.pageSize(),
dataPageScanEnabled
),
res.qryInfo()
);

if (msg != null)
sendNextPage(node, msg);
Expand Down Expand Up @@ -939,6 +966,9 @@ private GridQueryNextPageResponse prepareNextPage(
if (last) {
qr.closeResult(qry);

if (res.qryInfo() != null)
h2.heavyQueriesTracker().stopTracking(res.qryInfo(), null);

if (qr.isAllClosed()) {
nodeRess.remove(qr.queryRequestId(), segmentId, qr);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ public Iterator<List<?>> query(

runs.put(qryReqId, r);

ReduceH2QueryInfo qryInfo = null;

try {
cancel.add(() -> send(nodes, new GridQueryCancelRequest(qryReqId), null, true));

Expand Down Expand Up @@ -509,9 +511,11 @@ else if (QueryUtils.wasCancelled(err))

H2Utils.bindParameters(stmt, F.asList(rdc.parameters(params)));

ReduceH2QueryInfo qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(),
qryInfo = new ReduceH2QueryInfo(stmt, qry.originalSql(),
ctx.localNodeId(), qryId, qryReqId);

h2.heavyQueriesTracker().startTracking(qryInfo);

if (ctx.performanceStatistics().enabled()) {
ctx.performanceStatistics().queryProperty(
GridCacheQueryType.SQL_FIELDS,
Expand All @@ -522,12 +526,18 @@ else if (QueryUtils.wasCancelled(err))
);
}

ResultSet res = h2.executeSqlQueryWithTimer(stmt,
conn,
rdc.query(),
timeoutMillis,
cancel,
dataPageScanEnabled,
H2PooledConnection conn0 = conn;

ResultSet res = h2.executeWithResumableTimeTracking(
() -> h2.executeSqlQueryWithTimer(
stmt,
conn0,
rdc.query(),
timeoutMillis,
cancel,
dataPageScanEnabled,
null
),
qryInfo
);

Expand All @@ -549,6 +559,9 @@ else if (QueryUtils.wasCancelled(err))
catch (IgniteCheckedException | RuntimeException e) {
release = true;

if (qryInfo != null)
h2.heavyQueriesTracker().stopTracking(qryInfo, e);

if (e instanceof CacheException) {
if (QueryUtils.wasCancelled(e))
throw new CacheException("Failed to run reduce query locally.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ public void checkTablesVersions() {
GridH2Table.checkTablesVersions(ses);
}

/** */
public MapH2QueryInfo qryInfo() {
return res.qryInfo;
}

/** */
private class Result {
/** */
Expand Down
Loading