Skip to content

Commit

Permalink
IGNITE-24323 SQL Calcite: Add query blocking tasks executor (allows t…
Browse files Browse the repository at this point in the history
…o execute SQL inside UDF) - Fixes #11833.

Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
alex-plekhanov committed Jan 30, 2025
1 parent 89d70d4 commit 405e62b
Show file tree
Hide file tree
Showing 20 changed files with 1,309 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistry;
import org.apache.ignite.internal.processors.query.calcite.exec.MailboxRegistryImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutService;
import org.apache.ignite.internal.processors.query.calcite.exec.TimeoutServiceImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RexExecutorImpl;
import org.apache.ignite.internal.processors.query.calcite.exec.task.QueryBlockingTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.exec.task.StripedQueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.hint.HintsConfig;
import org.apache.ignite.internal.processors.query.calcite.message.MessageService;
import org.apache.ignite.internal.processors.query.calcite.message.MessageServiceImpl;
Expand Down Expand Up @@ -124,6 +125,7 @@
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;

import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_SQL_QUERY_EXECUTION;

Expand All @@ -141,6 +143,15 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
defaults = "" + DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT)
public static final String IGNITE_CALCITE_PLANNER_TIMEOUT = "IGNITE_CALCITE_PLANNER_TIMEOUT";

/**
* Use query blocking executor property name.
*/
@SystemProperty(value = "Calcite-based SQL engine. Use query blocking task executor instead of striped task " +
"executor. Query blocking executor allows to run SQL queries inside user-defined functions at the cost of " +
"some performance penalty", defaults = "" + false)
public static final String IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR =
"IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR";

/** */
public static final FrameworkConfig FRAMEWORK_CONFIG = Frameworks.newConfigBuilder()
.executor(new RexExecutorImpl(DataContexts.EMPTY))
Expand Down Expand Up @@ -186,8 +197,7 @@ public class CalciteQueryProcessor extends GridProcessorAdapter implements Query
private final FrameworkConfig frameworkCfg;

/** Query planner timeout. */
private final long queryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT,
DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);
private final long qryPlannerTimeout = getLong(IGNITE_CALCITE_PLANNER_TIMEOUT, DFLT_IGNITE_CALCITE_PLANNER_TIMEOUT);

/** */
private final QueryPlanCache qryPlanCache;
Expand Down Expand Up @@ -254,7 +264,9 @@ public CalciteQueryProcessor(GridKernalContext ctx) {
qryPlanCache = new QueryPlanCacheImpl(ctx);
parserMetrics = new QueryParserMetricsHolder(ctx.metric());
mailboxRegistry = new MailboxRegistryImpl(ctx);
taskExecutor = new QueryTaskExecutorImpl(ctx);
taskExecutor = getBoolean(IGNITE_CALCITE_USE_QUERY_BLOCKING_TASK_EXECUTOR)
? new QueryBlockingTaskExecutor(ctx)
: new StripedQueryTaskExecutor(ctx);
executionSvc = new ExecutionServiceImpl<>(ctx, ArrayRowHandler.INSTANCE);
partSvc = new AffinityServiceImpl(ctx);
msgSvc = new MessageServiceImpl(ctx);
Expand Down Expand Up @@ -666,7 +678,7 @@ private <T> T processQuery(
exchangeSvc,
(q, ex) -> qryReg.unregister(q.id(), ex),
log,
queryPlannerTimeout,
qryPlannerTimeout,
timeout
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public abstract class AbstractNode<Row> implements Node<Row> {
/** */
protected static final int IO_BATCH_CNT = IgniteSystemProperties.getInteger(IGNITE_CALCITE_EXEC_IO_BATCH_CNT, 4);

/** for debug purpose */
private volatile Thread thread;

/**
* {@link Inbox} node may not have proper context at creation time in case it
* creates on first message received from a remote source. This case the context
Expand Down Expand Up @@ -186,12 +183,6 @@ protected void checkState() throws Exception {
throw new QueryCancelledException("The query was timed out.");
if (Thread.interrupted())
throw new IgniteInterruptedCheckedException("Thread was interrupted.");
if (!U.assertionsEnabled())
return;
if (thread == null)
thread = Thread.currentThread();
else
assert thread == Thread.currentThread();
}

/** */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.query.calcite.exec.QueryTaskExecutor;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
* Abstract query task executor.
*/
public abstract class AbstractQueryTaskExecutor extends AbstractService implements QueryTaskExecutor, Thread.UncaughtExceptionHandler {
/** */
public static final String THREAD_POOL_NAME = "CalciteQueryExecutor";

/** */
protected final GridKernalContext ctx;

/** */
protected Thread.UncaughtExceptionHandler eHnd;

/** */
protected AbstractQueryTaskExecutor(GridKernalContext ctx) {
super(ctx);
this.ctx = ctx;
}

/** {@inheritDoc} */
@Override public void uncaughtException(Thread t, Throwable e) {
if (eHnd != null)
eHnd.uncaughtException(t, e);
}

/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
eHnd = ctx.uncaughtExceptionHandler();

super.onStart(ctx);
}

/** */
protected class SecurityAwareTask implements Runnable {
/** */
private final SecurityContext secCtx;

/** */
private final Runnable qryTask;

/** */
public SecurityAwareTask(SecurityContext secCtx, Runnable qryTask) {
this.secCtx = secCtx;
this.qryTask = qryTask;
}

/** {@inheritDoc} */
@Override public void run() {
try (AutoCloseable ignored = ctx.security().withContext(secCtx)) {
qryTask.run();
}
catch (Throwable e) {
U.warn(log, "Uncaught exception", e);

/*
* No exceptions are rethrown here to preserve the current thread from being destroyed,
* because other queries may be pinned to the current thread id.
* However, unrecoverable errors must be processed by FailureHandler.
*/
uncaughtException(Thread.currentThread(), e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

/**
* Query aware task.
*/
interface QueryAwareTask extends Runnable {
/** */
public QueryKey queryKey();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import java.util.UUID;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;

import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.metricName;
import static org.apache.ignite.internal.processors.pool.PoolProcessor.THREAD_POOLS;

/**
* Query task executor based on queue with query blocking.
*/
public class QueryBlockingTaskExecutor extends AbstractQueryTaskExecutor {
/** */
private final QueryTasksQueue tasksQueue = new QueryTasksQueue();

/** */
private IgniteThreadPoolExecutor executor;

/** */
public QueryBlockingTaskExecutor(GridKernalContext ctx) {
super(ctx);
}

/** {@inheritDoc} */
@Override public void execute(UUID qryId, long fragmentId, Runnable qryTask) {
SecurityContext secCtx = ctx.security().securityContext();

QueryKey qryKey = new QueryKey(qryId, fragmentId);

executor.execute(new QueryAndSecurityAwareTask(qryKey, secCtx, qryTask));
}

/** {@inheritDoc} */
@Override public void onStart(GridKernalContext ctx) {
super.onStart(ctx);

executor = new IgniteThreadPoolExecutor(
"calciteQry",
ctx.igniteInstanceName(),
ctx.config().getQueryThreadPoolSize(),
ctx.config().getQueryThreadPoolSize(),
IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME,
tasksQueue.blockingQueue(),
GridIoPolicy.CALLER_THREAD,
eHnd
) {
@Override protected void afterExecute(Runnable r, Throwable t) {
tasksQueue.unblockQuery(((QueryAwareTask)r).queryKey());

super.afterExecute(r, t);
}
};

// Prestart threads to ensure that all threads always use queue to poll tasks (without this call worker can
// get its first task directly from 'execute' method, bypassing tasks queue).
executor.prestartAllCoreThreads();

executor.registerMetrics(ctx.metric().registry(metricName(THREAD_POOLS, THREAD_POOL_NAME)));
}

/** {@inheritDoc} */
@Override public void tearDown() {
U.shutdownNow(getClass(), executor, log);
}

/** */
private class QueryAndSecurityAwareTask extends SecurityAwareTask implements QueryAwareTask {
/** */
private final QueryKey qryKey;

/** */
public QueryAndSecurityAwareTask(QueryKey qryKey, SecurityContext secCtx, Runnable qryTask) {
super(secCtx, qryTask);

this.qryKey = qryKey;
}

/** {@inheritDoc} */
@Override public QueryKey queryKey() {
return qryKey;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QueryAndSecurityAwareTask.class, this);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.processors.query.calcite.exec.task;

import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;

/**
* Query key.
*/
class QueryKey {
/** */
private final UUID qryId;

/** */
private final long fragmentId;

/** */
QueryKey(UUID qryId, long fragmentId) {
this.qryId = qryId;
this.fragmentId = fragmentId;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

QueryKey key = (QueryKey)o;

return fragmentId == key.fragmentId && Objects.equals(qryId, key.qryId);
}

/** {@inheritDoc} */
@Override public int hashCode() {
return U.safeAbs(31 * (31 + (qryId != null ? qryId.hashCode() : 0)) + Long.hashCode(fragmentId));
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(QueryKey.class, this);
}
}
Loading

0 comments on commit 405e62b

Please sign in to comment.