Skip to content

Commit

Permalink
[CALCITE-2658] Add ExchangeRemoveConstantKeysRule that removes consta…
Browse files Browse the repository at this point in the history
…nt keys from Exchange or SortExchange (Chunwei Lei)
  • Loading branch information
chunweilei authored and hsyuan committed Mar 28, 2019
1 parent a619cae commit 0537f27
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.calcite.rel.rules.AggregateReduceFunctionsRule;
import org.apache.calcite.rel.rules.AggregateStarTableRule;
import org.apache.calcite.rel.rules.AggregateValuesRule;
import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.rules.FilterProjectTransposeRule;
Expand Down Expand Up @@ -234,7 +235,9 @@ public class CalcitePrepareImpl implements CalcitePrepare {
SortProjectTransposeRule.INSTANCE,
SortJoinTransposeRule.INSTANCE,
SortRemoveConstantKeysRule.INSTANCE,
SortUnionTransposeRule.INSTANCE);
SortUnionTransposeRule.INSTANCE,
ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE,
ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE);

private static final List<RelOptRule> CONSTANT_REDUCTION_RULES =
ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.rel.rules;

import org.apache.calcite.plan.RelOptPredicateList;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexInputRef;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Planner rule that removes keys from
* a {@link Exchange} if those keys are known to be constant.
*
* <p>For example,
* <code>SELECT key,value FROM (SELECT 1 AS key, value FROM src) r DISTRIBUTE
* BY key</code> can be reduced to
* <code>SELECT 1 AS key, value FROM src</code>.</p>
*
*/
public class ExchangeRemoveConstantKeysRule extends RelOptRule {
/**
* Singleton rule that removes constants inside a
* {@link LogicalExchange}.
*/
public static final ExchangeRemoveConstantKeysRule EXCHANGE_INSTANCE =
new ExchangeRemoveConstantKeysRule(LogicalExchange.class,
"ExchangeRemoveConstantKeysRule");

/**
* Singleton rule that removes constants inside a
* {@link LogicalSortExchange}.
*/
public static final ExchangeRemoveConstantKeysRule SORT_EXCHANGE_INSTANCE =
new SortExchangeRemoveConstantKeysRule(LogicalSortExchange.class,
"SortExchangeRemoveConstantKeysRule");

private ExchangeRemoveConstantKeysRule(Class<? extends RelNode> clazz,
String description) {
super(operand(clazz, any()), RelFactories.LOGICAL_BUILDER, description);
}

/** Removes constant in distribution keys. */
protected static List<Integer> simplifyDistributionKeys(RelDistribution distribution,
Set<Integer> constants) {
return distribution.getKeys().stream()
.filter(key -> !constants.contains(key))
.collect(Collectors.toList());
}

@Override public boolean matches(RelOptRuleCall call) {
final Exchange exchange = call.rel(0);
return exchange.getDistribution().getType()
== RelDistribution.Type.HASH_DISTRIBUTED;
}

@Override public void onMatch(RelOptRuleCall call) {
final Exchange exchange = call.rel(0);
final RelMetadataQuery mq = call.getMetadataQuery();
final RelNode input = exchange.getInput();
final RelOptPredicateList predicates = mq.getPulledUpPredicates(input);
if (predicates == null) {
return;
}

final Set<Integer> constants = new HashSet<>();
predicates.constantMap.keySet().forEach(key -> {
if (key instanceof RexInputRef) {
constants.add(((RexInputRef) key).getIndex());
}
});
if (constants.isEmpty()) {
return;
}

final List<Integer> distributionKeys = simplifyDistributionKeys(
exchange.getDistribution(), constants);

if (distributionKeys.size() != exchange.getDistribution().getKeys()
.size()) {
call.transformTo(call.builder()
.push(exchange.getInput())
.exchange(distributionKeys.isEmpty()
?
RelDistributions.SINGLETON
:
RelDistributions.hash(distributionKeys))
.build());
call.getPlanner().setImportance(exchange, 0.0);
}
}

/**
* Rule that reduces constants inside a {@link SortExchange}.
*/
public static class SortExchangeRemoveConstantKeysRule
extends ExchangeRemoveConstantKeysRule {

private SortExchangeRemoveConstantKeysRule(Class<? extends RelNode> clazz,
String description) {
super(clazz, description);
}

@Override public boolean matches(RelOptRuleCall call) {
final SortExchange sortExchange = call.rel(0);
return sortExchange.getDistribution().getType()
== RelDistribution.Type.HASH_DISTRIBUTED
|| !sortExchange.getCollation().getFieldCollations().isEmpty();
}

@Override public void onMatch(RelOptRuleCall call) {
final SortExchange sortExchange = call.rel(0);
final RelMetadataQuery mq = call.getMetadataQuery();
final RelNode input = sortExchange.getInput();
final RelOptPredicateList predicates = mq.getPulledUpPredicates(input);
if (predicates == null) {
return;
}

final Set<Integer> constants = new HashSet<>();
predicates.constantMap.keySet().forEach(key -> {
if (key instanceof RexInputRef) {
constants.add(((RexInputRef) key).getIndex());
}
});

if (constants.isEmpty()) {
return;
}

List<Integer> distributionKeys = new ArrayList<>();
boolean distributionSimplified = false;
boolean hashDistribution = sortExchange.getDistribution().getType()
== RelDistribution.Type.HASH_DISTRIBUTED;
if (hashDistribution) {
distributionKeys = simplifyDistributionKeys(
sortExchange.getDistribution(), constants);
distributionSimplified =
distributionKeys.size() != sortExchange.getDistribution().getKeys()
.size();
}

final List<RelFieldCollation> fieldCollations = sortExchange
.getCollation().getFieldCollations().stream().filter(
fc -> !constants.contains(fc.getFieldIndex()))
.collect(Collectors.toList());

boolean collationSimplified =
fieldCollations.size() != sortExchange.getCollation()
.getFieldCollations().size();
if (distributionSimplified
|| collationSimplified) {
RelDistribution distribution = distributionSimplified
?
distributionKeys.isEmpty()
?
RelDistributions.SINGLETON
:
RelDistributions.hash(distributionKeys)
:
sortExchange.getDistribution();
RelCollation collation = collationSimplified
?
RelCollations.of(fieldCollations)
:
sortExchange.getCollation();

call.transformTo(call.builder()
.push(sortExchange.getInput())
.sortExchange(distribution, collation)
.build());
call.getPlanner().setImportance(sortExchange, 0.0);
}
}
}
}

// End ExchangeRemoveConstantKeysRule.java
36 changes: 36 additions & 0 deletions core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import org.apache.calcite.plan.hep.HepProgramBuilder;
import org.apache.calcite.prepare.Prepare;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Aggregate;
Expand Down Expand Up @@ -62,6 +65,7 @@
import org.apache.calcite.rel.rules.CalcMergeRule;
import org.apache.calcite.rel.rules.CoerceInputsRule;
import org.apache.calcite.rel.rules.DateRangeRules;
import org.apache.calcite.rel.rules.ExchangeRemoveConstantKeysRule;
import org.apache.calcite.rel.rules.FilterAggregateTransposeRule;
import org.apache.calcite.rel.rules.FilterJoinRule;
import org.apache.calcite.rel.rules.FilterMergeRule;
Expand Down Expand Up @@ -4311,6 +4315,38 @@ private Sql checkSubQuery(String sql) {
checkPlanning(program, sql);
}

@Test public void testExchangeRemoveConstantKeysRule() {
final DiffRepository diffRepos = getDiffRepos();
final RelBuilder builder = RelBuilder.create(RelBuilderTest.config().build());
RelNode root = builder
.scan("EMP")
.filter(
builder.call(SqlStdOperatorTable.EQUALS,
builder.field("EMPNO"), builder.literal(10)))
.exchange(RelDistributions.hash(ImmutableList.of(0)))
.project(builder.field(0), builder.field(1))
.sortExchange(RelDistributions.hash(ImmutableList.of(0, 1)),
RelCollations.of(new RelFieldCollation(0), new RelFieldCollation(1)))
.build();

HepProgram preProgram = new HepProgramBuilder().build();
HepPlanner prePlanner = new HepPlanner(preProgram);
prePlanner.setRoot(root);
final RelNode relBefore = prePlanner.findBestExp();
final String planBefore = NL + RelOptUtil.toString(relBefore);
diffRepos.assertEquals("planBefore", "${planBefore}", planBefore);

HepProgram hepProgram = new HepProgramBuilder()
.addRuleInstance(ExchangeRemoveConstantKeysRule.EXCHANGE_INSTANCE)
.addRuleInstance(ExchangeRemoveConstantKeysRule.SORT_EXCHANGE_INSTANCE)
.build();

HepPlanner hepPlanner = new HepPlanner(hepProgram);
hepPlanner.setRoot(root);
final RelNode relAfter = hepPlanner.findBestExp();
final String planAfter = NL + RelOptUtil.toString(relAfter);
diffRepos.assertEquals("planAfter", "${planAfter}", planAfter);
}
}

// End RelOptRulesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8851,6 +8851,26 @@ LogicalProject($0=[$2], $1=[$3], $2=[$4])
LogicalProject(COL1=[SUM(100) OVER (PARTITION BY $7, $5 ORDER BY $5 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], COL2=[SUM(100) OVER (PARTITION BY $5 ORDER BY $7 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)], COL3=[SUM($5) OVER (PARTITION BY $7 ORDER BY $5 RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)])
LogicalFilter(condition=[=($5, 5000)])
LogicalTableScan(table=[[CATALOG, SALES, EMP]])
]]>
</Resource>
</TestCase>
<TestCase name="testExchangeRemoveConstantKeysRule">
<Resource name="planBefore">
<![CDATA[
LogicalSortExchange(distribution=[hash[0, 1]], collation=[[0, 1]])
LogicalProject(EMPNO=[$0], ENAME=[$1])
LogicalExchange(distribution=[hash[0]])
LogicalFilter(condition=[=($0, 10)])
LogicalTableScan(table=[[scott, EMP]])
]]>
</Resource>
<Resource name="planAfter">
<![CDATA[
LogicalSortExchange(distribution=[hash[1]], collation=[[1]])
LogicalProject(EMPNO=[$0], ENAME=[$1])
LogicalExchange(distribution=[single])
LogicalFilter(condition=[=($0, 10)])
LogicalTableScan(table=[[scott, EMP]])
]]>
</Resource>
</TestCase>
Expand Down

0 comments on commit 0537f27

Please sign in to comment.