Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-6297] Optimize the distribute plan in the situation of aggregation with align by device #12043

Merged
merged 11 commits into from
Feb 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ aggregation results last_value(temperature) and last_value(status), whereas buck

// indicates whether DeviceView need special process when rewriteSource in DistributionPlan,
// you can see SourceRewriter#visitDeviceView to get more information
// deviceViewSpecialProcess equals true when all Aggregation Functions and DIFF
private boolean deviceViewSpecialProcess;

private boolean existDeviceCrossRegion;

/////////////////////////////////////////////////////////////////////////////////////////////////
// Query Common Analysis (above DeviceView)
/////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -655,6 +658,14 @@ public void setDeviceViewSpecialProcess(boolean deviceViewSpecialProcess) {
this.deviceViewSpecialProcess = deviceViewSpecialProcess;
}

public boolean isExistDeviceCrossRegion() {
return existDeviceCrossRegion;
}

public void setExistDeviceCrossRegion() {
this.existDeviceCrossRegion = true;
}

public DeviceViewIntoPathDescriptor getDeviceViewIntoPathDescriptor() {
return deviceViewIntoPathDescriptor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,6 @@ public LogicalPlanBuilder planDeviceView(
-1);
this.root = mergeSortNode;
} else {
// order by based on device, use DeviceViewNode
this.root =
addDeviceViewNode(
orderByParameter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void adjustUpStreamHelper(
if (child instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) child;
TRegionReplicaSet regionOfChild =
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).region;
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
MultiChildrenSinkNode newChild =
memo.computeIfAbsent(
regionOfChild,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryOrderByHeatNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationMergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.AggregationNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.DeviceViewNode;
Expand Down Expand Up @@ -115,13 +116,14 @@ private PlanNode internalVisitSchemaMerge(
NodeDistribution nodeDistribution =
new NodeDistribution(NodeDistributionType.DIFFERENT_FROM_ALL_CHILDREN);
PlanNode newNode = node.clone();
nodeDistribution.region = calculateSchemaRegionByChildren(node.getChildren(), context);
nodeDistribution.setRegion(calculateSchemaRegionByChildren(node.getChildren(), context));
context.putNodeDistribution(newNode.getPlanNodeId(), nodeDistribution);
node.getChildren()
.forEach(
child -> {
if (!nodeDistribution.region.equals(
context.getNodeDistribution(child.getPlanNodeId()).region)) {
if (!nodeDistribution
.getRegion()
.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
ExchangeNode exchangeNode =
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
Expand Down Expand Up @@ -200,6 +202,12 @@ public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
}

@Override
public PlanNode visitAggregationMergeSort(
AggregationMergeSortNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
}

@Override
public PlanNode visitDeviceMerge(DeviceMergeNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
Expand Down Expand Up @@ -265,7 +273,7 @@ public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, NodeGroupCont
// else we set the selected mostlyUsedDataRegion to this node
dataRegion =
isChildrenDistributionSame
? context.getNodeDistribution(leftChild.getPlanNodeId()).region
? context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion()
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
Expand All @@ -284,7 +292,7 @@ public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, NodeGroupCont

// Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the
// parent.
if (!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).region)) {
if (!dataRegion.equals(context.getNodeDistribution(leftChild.getPlanNodeId()).getRegion())) {
if (leftChild instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) leftChild).setCacheOutputColumnNames(true);
}
Expand All @@ -298,7 +306,7 @@ public PlanNode visitLeftOuterTimeJoin(LeftOuterTimeJoinNode node, NodeGroupCont
newNode.setLeftChild(leftChild);
}

if (!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).region)) {
if (!dataRegion.equals(context.getNodeDistribution(rightChild.getPlanNodeId()).getRegion())) {
if (rightChild instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) rightChild).setCacheOutputColumnNames(true);
}
Expand Down Expand Up @@ -377,7 +385,7 @@ private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupCont
.map(child -> visit(child, context))
.collect(Collectors.toList());

// DataRegion which node locates
// DataRegion in which node locates
TRegionReplicaSet dataRegion;
boolean isChildrenDistributionSame = nodeDistributionIsSame(visitedChildren, context);
NodeDistributionType distributionType =
Expand All @@ -390,7 +398,7 @@ private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupCont
// else we set the selected mostlyUsedDataRegion to this node
dataRegion =
isChildrenDistributionSame
? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).region
? context.getNodeDistribution(visitedChildren.get(0).getPlanNodeId()).getRegion()
: context.getMostlyUsedDataRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(), new NodeDistribution(distributionType, dataRegion));
Expand All @@ -409,21 +417,17 @@ private PlanNode processMultiChildNode(MultiChildProcessNode node, NodeGroupCont
// optimize `order by time|expression limit N align by device` query,
// to ensure that the number of ExchangeNode equals to DataRegionNum but not equals to DeviceNum
if (node instanceof TopKNode) {
return processTopNode(node, visitedChildren, context, newNode, dataRegion);
return processTopKNode(node, visitedChildren, context, newNode, dataRegion);
}

// Otherwise, we need to add ExchangeNode for the child whose DataRegion is different from the
// parent.
for (PlanNode child : visitedChildren) {
if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).region)) {
if (!dataRegion.equals(context.getNodeDistribution(child.getPlanNodeId()).getRegion())) {
if (child instanceof SingleDeviceViewNode) {
((SingleDeviceViewNode) child).setCacheOutputColumnNames(true);
}
ExchangeNode exchangeNode =
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
context.hasExchangeNode = true;
ExchangeNode exchangeNode = genExchangeNode(context, child);
newNode.addChild(exchangeNode);
} else {
newNode.addChild(child);
Expand All @@ -450,7 +454,7 @@ private PlanNode processMultiChildNodeByLocation(
return newNode;
}

private PlanNode processTopNode(
private PlanNode processTopKNode(
MultiChildProcessNode node,
List<PlanNode> visitedChildren,
NodeGroupContext context,
Expand All @@ -459,7 +463,7 @@ private PlanNode processTopNode(
TopKNode rootNode = (TopKNode) node;
Map<TRegionReplicaSet, TopKNode> regionTopKNodeMap = new HashMap<>();
for (PlanNode child : visitedChildren) {
TRegionReplicaSet region = context.getNodeDistribution(child.getPlanNodeId()).region;
TRegionReplicaSet region = context.getNodeDistribution(child.getPlanNodeId()).getRegion();
regionTopKNodeMap
.computeIfAbsent(
region,
Expand All @@ -483,11 +487,7 @@ private PlanNode processTopNode(
TopKNode topKNode = entry.getValue();

if (!dataRegion.equals(topKNodeLocatedRegion)) {
ExchangeNode exchangeNode =
new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(topKNode);
exchangeNode.setOutputColumnNames(topKNode.getOutputColumnNames());
context.hasExchangeNode = true;
ExchangeNode exchangeNode = genExchangeNode(context, topKNode);
newNode.addChild(exchangeNode);
} else {
newNode.addChild(topKNode);
Expand All @@ -496,6 +496,14 @@ private PlanNode processTopNode(
return newNode;
}

private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode child) {
ExchangeNode exchangeNode = new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
context.hasExchangeNode = true;
return exchangeNode;
}

@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
Expand All @@ -506,7 +514,7 @@ private PlanNode processOneChildNode(PlanNode node, NodeGroupContext context) {
PlanNode newNode = node.clone();
PlanNode child = visit(node.getChildren().get(0), context);
newNode.addChild(child);
TRegionReplicaSet dataRegion = context.getNodeDistribution(child.getPlanNodeId()).region;
TRegionReplicaSet dataRegion = context.getNodeDistribution(child.getPlanNodeId()).getRegion();
context.putNodeDistribution(
newNode.getPlanNodeId(),
new NodeDistribution(NodeDistributionType.SAME_WITH_ALL_CHILDREN, dataRegion));
Expand All @@ -523,9 +531,9 @@ private TRegionReplicaSet calculateDataRegionByChildren(
Collectors.groupingBy(
child -> {
TRegionReplicaSet region =
context.getNodeDistribution(child.getPlanNodeId()).region;
context.getNodeDistribution(child.getPlanNodeId()).getRegion();
if (region == null
&& context.getNodeDistribution(child.getPlanNodeId()).type
&& context.getNodeDistribution(child.getPlanNodeId()).getType()
== NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
return calculateSchemaRegionByChildren(child.getChildren(), context);
}
Expand Down Expand Up @@ -571,15 +579,15 @@ private TRegionReplicaSet calculateDataRegionByChildren(
private TRegionReplicaSet calculateSchemaRegionByChildren(
List<PlanNode> children, NodeGroupContext context) {
// We always make the schemaRegion of MetaMergeNode to be the same as its first child.
return context.getNodeDistribution(children.get(0).getPlanNodeId()).region;
return context.getNodeDistribution(children.get(0).getPlanNodeId()).getRegion();
}

private boolean nodeDistributionIsSame(List<PlanNode> children, NodeGroupContext context) {
// The size of children here should always be larger than 0, or our code has Bug.
NodeDistribution first = context.getNodeDistribution(children.get(0).getPlanNodeId());
for (int i = 1; i < children.size(); i++) {
NodeDistribution next = context.getNodeDistribution(children.get(i).getPlanNodeId());
if (first.region == null || !first.region.equals(next.region)) {
if (first.getRegion() == null || !first.getRegion().equals(next.getRegion())) {
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,31 @@
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;

public class NodeDistribution {
protected NodeDistributionType type;
protected TRegionReplicaSet region;
private NodeDistributionType type;
private TRegionReplicaSet region;

protected NodeDistribution(NodeDistributionType type, TRegionReplicaSet region) {
public NodeDistribution(NodeDistributionType type, TRegionReplicaSet region) {
this.type = type;
this.region = region;
}

protected NodeDistribution(NodeDistributionType type) {
public NodeDistribution(NodeDistributionType type) {
this.type = type;
}

public NodeDistributionType getType() {
return this.type;
}

public void setType(NodeDistributionType type) {
this.type = type;
}

public TRegionReplicaSet getRegion() {
return this.region;
}

public void setRegion(TRegionReplicaSet region) {
this.region = region;
}
}
Loading
Loading