Skip to content

Commit

Permalink
Blog post: Custom traits in Apache Calcite
Browse files Browse the repository at this point in the history
devozerov committed Jan 4, 2021
1 parent 0fd161a commit 1bb7d1c
Showing 7 changed files with 497 additions and 0 deletions.
58 changes: 58 additions & 0 deletions 02-custom-calcite-trait/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<name>Custom Calcite Trait</name>
<groupId>com.querifylabs.blog</groupId>
<artifactId>custom-calcite-trait</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<url>https://www.querifylabs.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<calcite.version>1.25.0</calcite.version>
<guava.version>30.0-jre</guava.version>
<junit.version>4.11</junit.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>${calcite.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTrait;
import org.apache.calcite.plan.RelTraitDef;

public class Distribution implements RelTrait {

public static final Distribution ANY = new Distribution(Type.ANY);
public static final Distribution PARTITIONED = new Distribution(Type.PARTITIONED);
public static final Distribution SINGLETON = new Distribution(Type.SINGLETON);

private final Type type;

private Distribution(Type type) {
this.type = type;
}

@SuppressWarnings("rawtypes")
@Override
public RelTraitDef getTraitDef() {
return DistributionTraitDef.INSTANCE;
}

@Override
public boolean satisfies(RelTrait toTrait) {
Distribution toTrait0 = (Distribution) toTrait;

if (toTrait0.type == Type.ANY) {
return true;
}

return this.type.equals(toTrait0.type);
}

@Override
public void register(RelOptPlanner planner) {
// No-op.
}

@Override
public String toString() {
return type.name();
}

enum Type {
ANY,
PARTITIONED,
SINGLETON
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitDef;
import org.apache.calcite.rel.RelNode;

public class DistributionTraitDef extends RelTraitDef<Distribution> {

public static DistributionTraitDef INSTANCE = new DistributionTraitDef();

private DistributionTraitDef() {
// No-op.
}

@Override
public Class<Distribution> getTraitClass() {
return Distribution.class;
}

@Override
public String getSimpleName() {
return "DISTRIBUTION";
}

@Override
public RelNode convert(
RelOptPlanner planner,
RelNode rel,
Distribution toTrait,
boolean allowInfiniteCostConverters
) {
Distribution fromTrait = rel.getTraitSet().getTrait(DistributionTraitDef.INSTANCE);

if (fromTrait.satisfies(toTrait)) {
return rel;
}

return new ExchangeRel(
rel.getCluster(),
rel.getTraitSet().plus(toTrait),
rel
);
}

@Override
public boolean canConvert(
RelOptPlanner planner,
Distribution fromTrait,
Distribution toTrait
) {
return true;
}

@Override
public Distribution getDefault() {
return Distribution.ANY;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.SingleRel;

import java.util.List;

public class ExchangeRel extends SingleRel {
public ExchangeRel(
RelOptCluster cluster,
RelTraitSet traits,
RelNode input
) {
super(cluster, traits, input);
}

@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
return new ExchangeRel(getCluster(), traitSet, inputs.get(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.schema.SchemaVersion;
import org.apache.calcite.schema.impl.AbstractSchema;

import java.util.HashMap;
import java.util.Map;

public class Schema extends AbstractSchema {

private final String schemaName;
private final Map<String, org.apache.calcite.schema.Table> tableMap;

private Schema(String schemaName, Map<String, org.apache.calcite.schema.Table> tableMap) {
this.schemaName = schemaName;
this.tableMap = tableMap;
}

public String getSchemaName() {
return schemaName;
}

@Override
public Map<String, org.apache.calcite.schema.Table> getTableMap() {
return tableMap;
}

@Override
public org.apache.calcite.schema.Schema snapshot(SchemaVersion version) {
return this;
}

public static Builder newBuilder(String schemaName) {
return new Builder(schemaName);
}

public static final class Builder {

private final String schemaName;
private final Map<String, org.apache.calcite.schema.Table> tableMap = new HashMap<>();

private Builder(String schemaName) {
if (schemaName == null || schemaName.isEmpty()) {
throw new IllegalArgumentException("Schema name cannot be null or empty");
}

this.schemaName = schemaName;
}

public Builder addTable(Table table) {
if (tableMap.containsKey(table.getTableName())) {
throw new IllegalArgumentException("Table already defined: " + table.getTableName());
}

tableMap.put(table.getTableName(), table);

return this;
}

public Schema build() {
return new Schema(schemaName, tableMap);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalTableScan;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rel.type.StructKind;
import org.apache.calcite.schema.TranslatableTable;
import org.apache.calcite.schema.impl.AbstractTable;
import org.apache.calcite.sql.type.SqlTypeName;

import java.util.ArrayList;
import java.util.List;

public class Table extends AbstractTable implements TranslatableTable {

private final String tableName;
private final Distribution distribution;
private final List<String> fieldNames;
private final List<SqlTypeName> fieldTypes;

private RelDataType rowType;

private Table(String tableName, Distribution distribution, List<String> fieldNames, List<SqlTypeName> fieldTypes) {
this.tableName = tableName;
this.distribution = distribution;
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
}

public String getTableName() {
return tableName;
}

@Override
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
if (rowType == null) {
List<RelDataTypeField> fields = new ArrayList<>(fieldNames.size());

for (int i = 0; i < fieldNames.size(); i++) {
RelDataType fieldType = typeFactory.createSqlType(fieldTypes.get(i));
RelDataTypeField field = new RelDataTypeFieldImpl(fieldNames.get(i), i, fieldType);
fields.add(field);
}

rowType = new RelRecordType(StructKind.PEEK_FIELDS, fields, false);
}

return rowType;
}

@Override
public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable relOptTable) {
RelTraitSet traitSet = context.getCluster().traitSetOf(distribution);

return new LogicalTableScan(
context.getCluster(),
traitSet,
context.getTableHints(),
relOptTable
);
}

public static Builder newBuilder(String tableName, Distribution distribution) {
return new Builder(tableName, distribution);
}

public static final class Builder {

private final String tableName;
private final Distribution distribution;
private final List<String> fieldNames = new ArrayList<>();
private final List<SqlTypeName> fieldTypes = new ArrayList<>();

private Builder(String tableName, Distribution distribution) {
if (tableName == null || tableName.isEmpty()) {
throw new IllegalArgumentException("Table name cannot be null or empty");
}

this.tableName = tableName;
this.distribution = distribution;
}

public Builder addField(String name, SqlTypeName typeName) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Field name cannot be null or empty");
}

if (fieldNames.contains(name)) {
throw new IllegalArgumentException("Field already defined: " + name);
}

fieldNames.add(name);
fieldTypes.add(typeName);

return this;
}

public Table build() {
if (fieldNames.isEmpty()) {
throw new IllegalStateException("Table must have at least one field");
}

return new Table(tableName, distribution, fieldNames, fieldTypes);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.querifylabs.blog.trait;

import org.apache.calcite.config.CalciteConnectionConfig;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.jdbc.JavaTypeFactoryImpl;
import org.apache.calcite.plan.ConventionTraitDef;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.AbstractConverter;
import org.apache.calcite.plan.volcano.VolcanoPlanner;
import org.apache.calcite.prepare.CalciteCatalogReader;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.externalize.RelWriterImpl;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Programs;
import org.apache.calcite.tools.RelBuilder;
import org.apache.calcite.tools.RelBuilderFactory;
import org.apache.calcite.tools.RuleSet;
import org.apache.calcite.tools.RuleSets;
import org.junit.Test;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;

import static org.junit.Assert.assertSame;

public class TraitTest {

private static final String TABLE_PARTITIONED = "partitioned";
private static final String TABLE_SINGLETON = "singleton";

@Test
public void testEnforceSingletonOnPartitioned() {
enforceSingleton(TABLE_PARTITIONED);
}

@Test
public void testEnforceSingletonOnSingleton() {
enforceSingleton(TABLE_SINGLETON);
}

private static void enforceSingleton(String tableName) {
// Prepare supporting objects.
Prepare.CatalogReader schema = createSchema();
VolcanoPlanner planner = createPlanner();

// Create a table scan on the desired table.
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(schema.getTypeFactory()));
RelBuilderFactory factory = RelBuilder.proto(RelFactories.DEFAULT_TABLE_SCAN_FACTORY);
RelBuilder relBuilder = factory.create(cluster, schema);
RelNode node = relBuilder.scan(tableName).build();
print("BEFORE", node);

// Use the built-in rule that will expand abstract converters.
RuleSet rules = RuleSets.ofList(AbstractConverter.ExpandConversionRule.INSTANCE);

// Prepare the desired traits with the SINGLETON distribution.
RelTraitSet desiredTraits = node.getTraitSet().plus(Distribution.SINGLETON);

// Use the planner to enforce the desired traits.
RelNode optimizedNode = Programs.of(rules).run(
planner,
node,
desiredTraits,
Collections.emptyList(),
Collections.emptyList()
);

print("AFTER", optimizedNode);

assertSame(Distribution.SINGLETON, optimizedNode.getTraitSet().getTrait(DistributionTraitDef.INSTANCE));
}

private static Prepare.CatalogReader createSchema() {
// Table with PARTITIONED distribution.
Table table1 = Table.newBuilder(TABLE_PARTITIONED, Distribution.PARTITIONED)
.addField("field", SqlTypeName.DECIMAL).build();

// Table with SINGLETON distribution.
Table table2 = Table.newBuilder(TABLE_SINGLETON, Distribution.SINGLETON)
.addField("field", SqlTypeName.DECIMAL).build();

Schema schema = Schema.newBuilder("schema").addTable(table1).addTable(table2).build();

RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();

CalciteConnectionConfig config = CalciteConnectionConfig.DEFAULT;

CalciteSchema rootSchema = CalciteSchema.createRootSchema(false, false);
rootSchema.add(schema.getSchemaName(), schema);

return new CalciteCatalogReader(
rootSchema,
Collections.singletonList(schema.getSchemaName()),
typeFactory,
config
);
}

private static VolcanoPlanner createPlanner() {
VolcanoPlanner planner = new VolcanoPlanner();

// Register distribution trait.
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
planner.addRelTraitDef(DistributionTraitDef.INSTANCE);

// DO NOT USE IN PRODUCTION: a quirk to allow Apache Calcite calculate costs for logical nodes.
// Without this line we would have to use a custom convention, that makes the example more complex.
planner.setNoneConventionHasInfiniteCost(false);

return planner;
}

private static void print(String header, RelNode relTree) {
StringWriter sw = new StringWriter();

sw.append(header).append(":").append("\n");

RelWriterImpl relWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.DIGEST_ATTRIBUTES, true);

relTree.explain(relWriter);

System.out.println(sw.toString());
}
}

0 comments on commit 1bb7d1c

Please sign in to comment.