Skip to content

Commit

Permalink
[Enhancement] improve statistics strategy for insert overwrite (#50417)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
(cherry picked from commit 5126313)
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Oct 10, 2024
1 parent 4aa95e2 commit 66d1aa1
Show file tree
Hide file tree
Showing 19 changed files with 860 additions and 122 deletions.
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,10 @@ public Partition getPartition(long partitionId) {
return partition;
}

public Optional<Partition> mayGetPartition(long partitionId) {
return Optional.ofNullable(getPartition(partitionId));
}

public PhysicalPartition getPhysicalPartition(long physicalPartitionId) {
Long partitionId = physicalPartitionIdToPartitionId.get(physicalPartitionId);
if (partitionId == null) {
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2075,6 +2075,10 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static long statistic_sample_collect_rows = 200000;

@ConfField(mutable = true, comment = "If changed ratio of a table/partition is larger than this threshold, " +
"we would use sample statistics instead of full statistics")
public static double statistic_sample_collect_ratio_threshold_of_first_load = 0.1;

/**
* default bucket size of histogram statistics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import com.google.common.collect.ImmutableList;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.qe.DmlType;
import com.starrocks.transaction.InsertOverwriteJobStats;
import com.starrocks.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -53,23 +55,26 @@ public void onLoadJobTransactionFinish(TransactionState transactionState) {
* @param db database of the target table
* @param table target table that has changed
*/
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table) {
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table,
DmlType dmlType) {
if (transactionState == null) {
return;
}
listeners.stream().forEach(listener -> listener.onDMLStmtJobTransactionFinish(transactionState, db, table));
listeners.stream().forEach(listener -> listener.onDMLStmtJobTransactionFinish(transactionState, db, table,
dmlType
));
}

/**
* Do all callbacks after `Insert OVERWRITE` transaction is finished, which is only triggered without an error.
* @param db database of the target table
* @param table target table that has changed
*/
public void onInsertOverwriteJobCommitFinish(Database db, Table table) {
public void onInsertOverwriteJobCommitFinish(Database db, Table table, InsertOverwriteJobStats stats) {
if (db == null || table == null) {
return;
}
listeners.stream().forEach(listener -> listener.onInsertOverwriteJobCommitFinish(db, table));
listeners.stream().forEach(listener -> listener.onInsertOverwriteJobCommitFinish(db, table, stats));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.qe.DmlType;
import com.starrocks.transaction.InsertOverwriteJobStats;
import com.starrocks.transaction.TransactionState;

/**
Expand All @@ -38,18 +40,22 @@ public interface LoadJobListener {

/**
* Listener after `Insert INTO` transaction is finished, which is only triggered without an error.
*
* @param transactionState finished transaction states
* @param db database of the target table
* @param table target table that has changed
* @param db database of the target table
* @param table target table that has changed
* @param dmlType
*/
void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table);
void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table, DmlType dmlType);

/**
* Listener after `Insert OVERWRITE` transaction is finished, which is only triggered without an error.
* @param db database of the target table
*
* @param db database of the target table
* @param table target table that has changed
* @param stats
*/
void onInsertOverwriteJobCommitFinish(Database db, Table table);
void onInsertOverwriteJobCommitFinish(Database db, Table table, InsertOverwriteJobStats stats);

/**
* Listener after `Delete` transaction is finished, which is only triggered without an error.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.qe.DmlType;
import com.starrocks.scheduler.Constants;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.transaction.InsertOverwriteJobStats;
import com.starrocks.transaction.PartitionCommitInfo;
import com.starrocks.transaction.TableCommitInfo;
import com.starrocks.transaction.TransactionState;
Expand Down Expand Up @@ -69,7 +71,8 @@ public void onLoadJobTransactionFinish(TransactionState transactionState) {
}

@Override
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table) {
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table,
DmlType dmlType) {
if (table != null && table.isMaterializedView()) {
return;
}
Expand All @@ -80,7 +83,7 @@ public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Dat
}

@Override
public void onInsertOverwriteJobCommitFinish(Database db, Table table) {
public void onInsertOverwriteJobCommitFinish(Database db, Table table, InsertOverwriteJobStats stats) {
triggerToRefreshRelatedMVs(db, table);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.qe.DmlType;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.statistic.StatisticUtils;
import com.starrocks.transaction.InsertOverwriteJobStats;
import com.starrocks.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -50,19 +52,41 @@ public void onLoadJobTransactionFinish(TransactionState transactionState) {
}

@Override
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table) {
StatisticUtils.triggerCollectionOnFirstLoad(transactionState, db, table, true, true);
public void onDMLStmtJobTransactionFinish(TransactionState transactionState, Database db, Table table,
DmlType dmlType) {
if (dmlType != DmlType.INSERT_OVERWRITE && needTrigger()) {
StatisticUtils.triggerCollectionOnFirstLoad(transactionState, db, table, true, true);
}
}

@Override
public void onInsertOverwriteJobCommitFinish(Database db, Table table) {
// do nothing
public void onInsertOverwriteJobCommitFinish(Database db, Table table, InsertOverwriteJobStats stats) {
if (needTrigger()) {
StatisticUtils.triggerCollectionOnInsertOverwrite(stats, db, table, true, true);
}
}

/**
* Whether to trigger the statistics collection
*/
private boolean needTrigger() {
if (GlobalStateMgr.isCheckpointThread()) {
return false;
}
GlobalStateMgr stateMgr = GlobalStateMgr.getCurrentState();
if (stateMgr == null || !stateMgr.isLeader() || !stateMgr.isReady()) {
return false;
}
return true;
}

private void onTransactionFinish(TransactionState transactionState, boolean sync) {
if (!Config.enable_statistic_collect_on_first_load) {
return;
}
if (!needTrigger()) {
return;
}

long dbId = transactionState.getDbId();
LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.starrocks.sql.ast.RangePartitionDesc;
import com.starrocks.sql.common.DmlException;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.transaction.InsertOverwriteJobStats;
import com.starrocks.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -81,8 +83,10 @@ public class InsertOverwriteJobRunner {
private final long tableId;
private final String postfix;

// execution stat
private long createPartitionElapse;
private long insertElapse;
private TransactionState transactionState;

public InsertOverwriteJobRunner(InsertOverwriteJob job, ConnectContext context, StmtExecutor stmtExecutor) {
this.job = job;
Expand Down Expand Up @@ -441,6 +445,9 @@ private void doCommit(boolean isReplay) {
throw new DmlException("insert overwrite commit failed because locking db:%s failed", dbId);
}
OlapTable tmpTargetTable = null;
InsertOverwriteJobStats stats = new InsertOverwriteJobStats();
stats.setSourcePartitionIds(job.getSourcePartitionIds());
stats.setTargetPartitionIds(job.getTmpPartitionIds());
try {
// try exception to release write lock finally
final OlapTable targetTable = checkAndGetTable(db, tableId);
Expand All @@ -467,6 +474,10 @@ private void doCommit(boolean isReplay) {
sourceTablets.addAll(index.getTablets());
}
});
long sumSourceRows = job.getSourcePartitionIds().stream()
.mapToLong(p -> targetTable.mayGetPartition(p).stream().mapToLong(Partition::getRowCount).sum())
.sum();
stats.setSourceRows(sumSourceRows);

PartitionInfo partitionInfo = targetTable.getPartitionInfo();
if (partitionInfo.isRangePartition() || partitionInfo.getType() == PartitionType.LIST) {
Expand All @@ -476,6 +487,11 @@ private void doCommit(boolean isReplay) {
} else {
throw new DdlException("partition type " + partitionInfo.getType() + " is not supported");
}

long sumTargetRows = job.getTmpPartitionIds().stream()
.mapToLong(p -> targetTable.mayGetPartition(p).stream().mapToLong(Partition::getRowCount).sum())
.sum();
stats.setTargetRows(sumTargetRows);
if (!isReplay) {
// mark all source tablet ids force delete to drop it directly on BE,
// not to move it to trash
Expand Down Expand Up @@ -504,9 +520,12 @@ private void doCommit(boolean isReplay) {
locker.unLockDatabase(db, tableId, LockType.WRITE);
}

// trigger listeners after insert overwrite committed, trigger listeners after
// write unlock to avoid holding lock too long
GlobalStateMgr.getCurrentState().getOperationListenerBus().onInsertOverwriteJobCommitFinish(db, tmpTargetTable);
if (!isReplay) {
// trigger listeners after insert overwrite committed, trigger listeners after
// write unlock to avoid holding lock too long
GlobalStateMgr.getCurrentState().getOperationListenerBus()
.onInsertOverwriteJobCommitFinish(db, tmpTargetTable, stats);
}
}

private void prepareInsert() {
Expand Down
49 changes: 49 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DmlType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.UpdateStmt;

/**
* Types of DML operation
*/
public enum DmlType {

INSERT_INTO,
INSERT_OVERWRITE,
UPDATE,
DELETE;

public static DmlType fromStmt(DmlStmt stmt) {
if (stmt instanceof InsertStmt) {
InsertStmt insertStmt = (InsertStmt) stmt;
if (insertStmt.isOverwrite()) {
return INSERT_OVERWRITE;
} else {
return INSERT_INTO;
}
} else if (stmt instanceof UpdateStmt) {
return UPDATE;
} else if (stmt instanceof DeleteStmt) {
return DELETE;
} else {
throw new UnsupportedOperationException("unsupported");
}
}

}
10 changes: 5 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
return;
}

MetaUtils.normalizationTableName(context, stmt.getTableName());
DmlType dmlType = DmlType.fromStmt(stmt);
stmt.getTableName().normalization(context);
String catalogName = stmt.getTableName().getCatalog();
String dbName = stmt.getTableName().getDb();
String tableName = stmt.getTableName().getTbl();
Expand All @@ -2023,8 +2024,7 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
"explain analyze only supports insert into olap native table");
}

if (parsedStmt instanceof InsertStmt && ((InsertStmt) parsedStmt).isOverwrite() &&
!((InsertStmt) parsedStmt).hasOverwriteJob() &&
if (dmlType == DmlType.INSERT_OVERWRITE && !((InsertStmt) parsedStmt).hasOverwriteJob() &&
!(targetTable.isIcebergTable() || targetTable.isHiveTable())) {
handleInsertOverwrite((InsertStmt) parsedStmt);
return;
Expand Down Expand Up @@ -2410,8 +2410,8 @@ public void handleDMLStmt(ExecPlan execPlan, DmlStmt stmt) throws Exception {
LOG.warn("errors when cancel insert load job {}", jobId);
}
} else if (txnState != null) {
GlobalStateMgr.getCurrentState().getOperationListenerBus().onDMLStmtJobTransactionFinish(txnState, database,
targetTable);
GlobalStateMgr.getCurrentState().getOperationListenerBus()
.onDMLStmtJobTransactionFinish(txnState, database, targetTable, dmlType);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ default void refreshTableStatistic(Table table) {
default void refreshTableStatisticSync(Table table) {
}

/**
* Overwrite the statistics of `targetPartition` with `sourcePartition`
*/
default void overwritePartitionStatistics(long tableId, long sourcePartition, long targetPartition) {
}

default void updatePartitionStatistics(long tableId, long partition, long rows) {
}

ColumnStatistic getColumnStatistic(Table table, String column);

List<ColumnStatistic> getColumnStatistics(Table table, List<String> columns);
Expand Down
Loading

0 comments on commit 66d1aa1

Please sign in to comment.