Skip to content

Commit

Permalink
IGNITE-24336 SQL Calcite: Add support of user-defined table functions -
Browse files Browse the repository at this point in the history
Fixes #11832.

Signed-off-by: Aleksey Plekhanov <[email protected]>
  • Loading branch information
Vladsz83 authored and alex-plekhanov committed Jan 31, 2025
1 parent 405e62b commit 1a75353
Show file tree
Hide file tree
Showing 17 changed files with 807 additions and 26 deletions.
17 changes: 17 additions & 0 deletions docs/_docs/SQL/custom-sql-func.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,21 @@ Once you have deployed a cache with the above configuration, you can call the cu
include::{javaFile}[tags=sql-function-query, indent=0]
----


Custom SQL function can be a table function. Result of table function is treated as a row set (a table) and can be used
by other SQL operators. Custom SQL function is also a `public` method marked by annotation `@QuerySqlTableFunction`.
Table function must return an `Iterable` as a row set. Each row can be represented by an `Object[]` or by a `Collection`.
Row length must match the defined number of column types. Row value types must match the defined column types or be able
assigned to them.

[source,java]
----
include::{javaFile}[tags=sql-table-function-example, indent=0]
----
[source,java]
----
include::{javaFile}[tags=sql-table-function-config-query, indent=0]
----
NOTE: The table functions also are available currently only with link:SQL/sql-calcite[Calcite, window=_blank].

NOTE: Classes registered with `CacheConfiguration.setSqlFunctionClasses(...)` must be added to the classpath of all the nodes where the defined custom functions might be executed. Otherwise, you will get a `ClassNotFoundException` error when trying to execute the custom function.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,18 @@ public static int sqr(int x) {

// end::sql-function-example[]

// tag::sql-table-function-example[]
static class SqlTableFunctions {
@QuerySqlTableFunction(columnTypes = {Integer.class, String.class}, columnNames = {"INT_COL", "STR_COL"})
public static Iterable<Object[]> table_function(int i) {
return Arrays.asList(
new Object[] {i, "" + i},
new Object[] {i * 10, "empty"}
);
}
}
// end::sql-table-function-example[]

@Test
IgniteCache setSqlFunction(Ignite ignite) {

Expand All @@ -181,6 +193,23 @@ IgniteCache setSqlFunction(Ignite ignite) {
return cache;
}

@Test
IgniteCache testSqlTableFunction(Ignite ignite) {
// tag::sql-table-function-config-query[]
CacheConfiguration cfg = new CacheConfiguration("myCache");

cfg.setSqlFunctionClasses(SqlTableFunctions.class);

IgniteCache cache = ignite.createCache(cfg);

SqlFieldsQuery query = new SqlFieldsQuery("SELECT STR_COL FROM TABLE_FUNCTION(10) WHERE INT_COL > 50");

cache.query(query).getAll();
// end::sql-table-function-config-query[]

return cache;
}

void call(IgniteCache cache) {

// tag::sql-function-query[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,13 +662,13 @@ else if (rel instanceof Intersect)

/** {@inheritDoc} */
@Override public Node<Row> visit(IgniteTableFunctionScan rel) {
Supplier<Iterable<Object[]>> dataSupplier = expressionFactory.execute(rel.getCall());
Supplier<Iterable<?>> dataSupplier = expressionFactory.execute(rel.getCall());

RelDataType rowType = rel.getRowType();

RowFactory<Row> rowFactory = ctx.rowHandler().factory(ctx.getTypeFactory(), rowType);

return new ScanNode<>(ctx, rowType, new TableFunctionScan<>(dataSupplier, rowFactory));
return new ScanNode<>(ctx, rowType, new TableFunctionScan<>(rowType, dataSupplier, rowFactory));
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,55 @@

package org.apache.ignite.internal.processors.query.calcite.exec;

import java.util.Collection;
import java.util.Iterator;
import java.util.function.Supplier;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.processors.query.calcite.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.util.typedef.F;

/** */
public class TableFunctionScan<Row> implements Iterable<Row> {
/** */
private final Supplier<Iterable<Object[]>> dataSupplier;
private final RelDataType rowType;

/** */
private final Supplier<Iterable<?>> dataSupplier;

/** */
private final RowFactory<Row> rowFactory;

/** */
public TableFunctionScan(
Supplier<Iterable<Object[]>> dataSupplier,
RelDataType rowType,
Supplier<Iterable<?>> dataSupplier,
RowFactory<Row> rowFactory
) {
this.rowType = rowType;
this.dataSupplier = dataSupplier;
this.rowFactory = rowFactory;
}

/** {@inheritDoc} */
@Override public Iterator<Row> iterator() {
return F.iterator(dataSupplier.get(), rowFactory::create, true);
return F.iterator(dataSupplier.get(), this::convertToRow, true);
}

/** */
private Row convertToRow(Object rowContainer) {
if (rowContainer.getClass() != Object[].class && !Collection.class.isAssignableFrom(rowContainer.getClass()))
throw new IgniteSQLException("Unable to process table function data: row type is neither Collection or Object[].");

Object[] rowArr = rowContainer.getClass() == Object[].class
? (Object[])rowContainer
: ((Collection<?>)rowContainer).toArray();

if (rowArr.length != rowType.getFieldCount()) {
throw new IgniteSQLException("Unable to process table function data: row length [" + rowArr.length
+ "] doesn't match defined columns number [" + rowType.getFieldCount() + "].");
}

return rowFactory.create(rowArr);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.calcite.exec.exp;

import java.lang.reflect.Method;
import org.apache.calcite.schema.impl.ReflectiveFunctionBase;

/** A base for outer java-method functions. */
abstract class IgniteReflectiveFunctionBase extends ReflectiveFunctionBase implements ImplementableFunction {
/** */
protected final CallImplementor implementor;

/** */
protected IgniteReflectiveFunctionBase(Method method, CallImplementor implementor) {
super(method);

this.implementor = implementor;
}

/** {@inheritDoc} */
@Override public CallImplementor getImplementor() {
return implementor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,16 @@
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.ScalarFunction;
import org.apache.calcite.schema.impl.ReflectiveFunctionBase;

/**
* Implementation of {@link ScalarFunction} for Ignite user defined functions.
*/
public class IgniteScalarFunction extends ReflectiveFunctionBase implements ScalarFunction, ImplementableFunction {
/** Implementor. */
private final CallImplementor implementor;

public class IgniteScalarFunction extends IgniteReflectiveFunctionBase implements ScalarFunction {
/**
* Private constructor.
*/
private IgniteScalarFunction(Method method, CallImplementor implementor) {
super(method);

this.implementor = implementor;
super(method, implementor);
}

/**
Expand All @@ -56,9 +50,4 @@ public static ScalarFunction create(Method method) {
@Override public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
return typeFactory.createJavaType(method.getReturnType());
}

/** {@inheritDoc} */
@Override public CallImplementor getImplementor() {
return implementor;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.query.calcite.exec.exp;

import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.adapter.enumerable.NullPolicy;
import org.apache.calcite.adapter.java.JavaTypeFactory;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.schema.TableFunction;
import org.apache.ignite.cache.query.annotations.QuerySqlTableFunction;
import org.apache.ignite.internal.processors.query.IgniteSQLException;
import org.apache.ignite.internal.util.typedef.F;

/**
* Holder of user-defined table function.
*
* @see QuerySqlTableFunction
*/
public class IgniteTableFunction extends IgniteReflectiveFunctionBase implements TableFunction {
/** Column types of the returned table representation. */
private final Class<?>[] colTypes;

/** Column names of the returned table representation. */
private final List<String> colNames;

/**
* Creates user-defined table function holder.
*
* @param method The implementatng method.
* @param colTypes Column types of the returned table representation.
* @param colNames Column names of the returned table representation.
* @param implementor Call implementor.
*/
private IgniteTableFunction(Method method, Class<?>[] colTypes, String[] colNames, CallImplementor implementor) {
super(method, implementor);

validate(method, colTypes, colNames);

this.colTypes = colTypes;
this.colNames = Arrays.asList(colNames);
}

/**
* Creates user-defined table function implementor and holder.
*
* @param method The implementating method.
* @param colTypes Column types of the returned table representation.
* @param colNames Column names of the returned table representation.
*/
public static IgniteTableFunction create(Method method, Class<?>[] colTypes, String[] colNames) {
NotNullImplementor implementor = new ReflectiveCallNotNullImplementor(method);

CallImplementor impl = RexImpTable.createImplementor(implementor, NullPolicy.NONE, false);

return new IgniteTableFunction(method, colTypes, colNames, impl);
}

/** {@inheritDoc} */
@Override public RelDataType getRowType(RelDataTypeFactory typeFactory, List<?> arguments) {
JavaTypeFactory tf = (JavaTypeFactory)typeFactory;

List<RelDataType> converted = Stream.of(colTypes).map(cl -> tf.toSql(tf.createType(cl))).collect(Collectors.toList());

return typeFactory.createStructType(converted, colNames);
}

/** {@inheritDoc} */
@Override public Type getElementType(List<?> arguments) {
// Calcite's {@link TableFunctionImpl} does real invocation ({@link TableFunctionImpl#apply(List)}) to determine
// the type. The call might be long, 'heavy', may affect some metrics and should not be executed at validation/planning.
// We may check the argument number here but not their types. The types might be wrong, but converted further.
if (F.isEmpty(arguments) && !F.isEmpty(method.getParameterTypes())
|| F.isEmpty(method.getParameterTypes()) && !F.isEmpty(arguments)
|| method.getParameterTypes().length != arguments.size()) {
throw new IllegalArgumentException("Wrong arguments number: " + arguments.size() + ". Expected: "
+ method.getParameterTypes().length + '.');
}

return Iterable.class;
}

/** Validates the parameters and throws an exception if it finds an incorrect parameter. */
private static void validate(Method mtd, Class<?>[] colTypes, String[] colNames) {
if (F.isEmpty(colTypes))
raiseValidationError(mtd, "Column types cannot be empty.");

if (F.isEmpty(colNames))
raiseValidationError(mtd, "Column names cannot be empty.");

if (colTypes.length != colNames.length) {
raiseValidationError(mtd, "Number of the table column names [" + colNames.length
+ "] must match the number of column types [" + colTypes.length + "].");
}

if (new HashSet<>(Arrays.asList(colNames)).size() != colNames.length)
raiseValidationError(mtd, "One or more column names is not unique.");

if (!Iterable.class.isAssignableFrom(mtd.getReturnType()))
raiseValidationError(mtd, "The method is expected to return a collection (iterable).");
}

/**
* Throws a parameter validation exception with a standard text prefix.
*
* @param method A java-method implementing related user-defined table function.
* @param errPostfix Error text postfix.
*/
private static void raiseValidationError(Method method, String errPostfix) {
String mtdSign = method.getName() + '(' + Stream.of(method.getParameterTypes()).map(Class::getSimpleName)
.collect(Collectors.joining(", ")) + ')';

throw new IgniteSQLException("Unable to create table function for method '" + mtdSign + "'. " + errPostfix);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.ignite.internal.processors.query.QueryField;
import org.apache.ignite.internal.processors.query.QueryUtils;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteScalarFunction;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.IgniteTableFunction;
import org.apache.ignite.internal.processors.query.calcite.trait.TraitUtils;
import org.apache.ignite.internal.processors.query.calcite.type.IgniteTypeFactory;
import org.apache.ignite.internal.processors.query.calcite.util.AbstractService;
Expand Down Expand Up @@ -357,6 +358,21 @@ private static Object affinityIdentity(CacheConfiguration<?, ?> ccfg) {
rebuild();
}

/** {@inheritDoc} */
@Override public void onTableFunctionCreated(
String schemaName,
String name,
Method method,
Class<?>[] colTypes,
String[] colNames
) {
IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);

schema.addFunction(name.toUpperCase(), IgniteTableFunction.create(method, colTypes, colNames));

rebuild();
}

/** {@inheritDoc} */
@Override public void onSystemViewCreated(String schemaName, SystemView<?> sysView) {
IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,5 +334,18 @@ public Employer(String name, Double salary) {
this.name = name;
this.salary = salary;
}

/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;

if (o == null || getClass() != o.getClass())
return false;

Employer employer = (Employer)o;

return name.equals(employer.name) && salary.equals(employer.salary);
}
}
}
Loading

0 comments on commit 1a75353

Please sign in to comment.