Skip to content

Commit

Permalink
update olap table partition cold time when related_olap_table propert…
Browse files Browse the repository at this point in the history
…y has set #26
  • Loading branch information
hoffermei(梅海峰) committed Mar 22, 2023
1 parent 6bb060c commit e0440f4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
102 changes: 102 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/IcebergTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.common.io.Text;
import com.starrocks.common.util.DynamicPartitionUtil;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.external.iceberg.IcebergCatalog;
import com.starrocks.external.iceberg.IcebergCatalogType;
import com.starrocks.external.iceberg.IcebergHiveCatalog;
import com.starrocks.external.iceberg.IcebergUtil;
import com.starrocks.external.iceberg.StarRocksIcebergException;
import com.starrocks.external.iceberg.io.IcebergCachingFileIO;
import com.starrocks.persist.ModifyPartitionInfo;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.common.UnsupportedException;
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
import com.starrocks.sql.optimizer.statistics.ColumnStatistic;
import com.starrocks.sql.optimizer.statistics.Statistics;
Expand All @@ -35,13 +39,17 @@
import com.starrocks.thrift.TIcebergTablePartitionColumn;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
Expand All @@ -50,16 +58,24 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toList;
import static org.apache.iceberg.NullOrder.NULLS_FIRST;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.apache.iceberg.util.DateTimeUtil.EPOCH_DAY;

public class IcebergTable extends Table {
private static final Logger LOG = LogManager.getLogger(IcebergTable.class);
Expand Down Expand Up @@ -158,6 +174,17 @@ public void setRelatedOlapTable(String relatedOlapTable) {
icebergProperties.put(PropertyAnalyzer.RELATED_OLAP_TABLE, relatedOlapTable);
}

public Database getRelatedOlapTableDb() {
String olapTableName = getRelatedOlapTableName();
if (Strings.isNullOrEmpty(olapTableName)) {
return null;
}
String[] parts = olapTableName.split("\\.");
Database relatedOlapDb = Catalog.getCurrentCatalog().getDb(
ClusterNamespace.getFullName(SystemInfoService.DEFAULT_CLUSTER, parts[0]));
return relatedOlapDb;
}

public String getCompactComputeNodeSelector() {
return icebergProperties.getOrDefault(PropertyAnalyzer.COMPACT_COMPUTE_NODE_SELECTOR, null);
}
Expand Down Expand Up @@ -683,4 +710,79 @@ public List<ColumnStatistic> getColumnStats(Map<ColumnRefOperator, Column> colRe
public void onDrop() {
Catalog.getCurrentCatalog().getIcebergRepository().clearCache();
}

public void updateRelatedOlapTablePartitions(Set<DataFile> dataFiles) throws UserException {
org.apache.iceberg.Table iTable = this.getIcebergTable();
List<PartitionField> partitionFields = iTable.spec().fields();
if (partitionFields.size() != 1) {
LOG.warn("only support one partition field currently, but get: {}", partitionFields.size());
throw UnsupportedException.unsupportedException("only support one partition field currently");
}
OlapTable olapTable = getRelatedOlapTable();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
Column partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
String partitionFormat;
try {
partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
} catch (DdlException e) {
throw new UserException(e);
}

Types.NestedField field = iTable.schema().findField(partitionFields.get(0).sourceId());
Map<Long, Long> partitionMap = new HashedMap();
for (DataFile dataFile : dataFiles) {
ByteBuffer value = dataFile.lowerBounds().get(field.fieldId());
if (value == null) {
continue;
}
ZonedDateTime start;
if (field.type().typeId() == Type.TypeID.TIMESTAMP) {
long lower = Conversions.fromByteBuffer(field.type(), value);
LOG.info("lower: {}", lower);
LocalDateTime triggerTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(lower / 1000),
TimeUtils.getTimeZone().toZoneId());
start = ZonedDateTime.of(triggerTime, TimeUtils.getTimeZone().toZoneId());

} else if (field.type().typeId() == Type.TypeID.DATE) {
int lower = Conversions.fromByteBuffer(field.type(), dataFile.lowerBounds().get(field.fieldId()));
LocalDate date = ChronoUnit.DAYS.addTo(EPOCH_DAY, lower);
start = date.atStartOfDay(TimeUtils.getTimeZone().toZoneId());
LOG.info("lower: {}", lower);
} else {
throw UnsupportedException.unsupportedException(
"unsupported partition field type" + field.type().typeId().name());
}

String prevBorder =
DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty, start, 0, partitionFormat);
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(
dynamicPartitionProperty.getTimeZone(), prevBorder, dynamicPartitionProperty.getTimeUnit());
LOG.info("partitionName: {}", partitionName);

Partition partition = olapTable.getPartition(partitionName);
if (partition == null) {
continue;
}
long coldDownTimeMs = System.currentTimeMillis();
partitionMap.put(partition.getId(), coldDownTimeMs);
}

if (partitionMap.size() == 0) {
return;
}
Database db = getRelatedOlapTableDb();
for (Map.Entry<Long, Long> entry : partitionMap.entrySet()) {
rangePartitionInfo.setColdDownSyncedTimeMs(entry.getKey(), entry.getValue());
// log
ModifyPartitionInfo info = new ModifyPartitionInfo(
db.getId(), olapTable.getId(), entry.getKey(),
DataProperty.DEFAULT_DATA_PROPERTY, (short) -1,
rangePartitionInfo.getIsInMemory(entry.getKey()),
entry.getValue(), -1L, -1L, -1L);
Catalog.getCurrentCatalog().getEditLog().logModifyPartition(info);
LOG.info("modify partition {} cold down time on related olap table {} to {}",
entry.getKey(), olapTable, entry.getValue());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
Expand Down Expand Up @@ -607,6 +608,9 @@ public void commitTransaction(long transactionId, Table tbl,
if (txnCommitAttachment != null) {
transactionState.setTxnCommitAttachment(txnCommitAttachment);
}
if (sinkCommitAttachment != null) {
transactionState.setSinkCommitAttachment(sinkCommitAttachment);
}

// before state transform
TxnStateChangeCallback callback = transactionState.beforeStateTransform(TransactionStatus.COMMITTED);
Expand Down Expand Up @@ -661,6 +665,16 @@ public void commitTransaction(long transactionId, Table tbl,
transactionState.afterStateTransform(TransactionStatus.COMMITTED, txnOperated, callback, null);
}

// update related olap table partition sync property
if (tbl.getType() == Table.TableType.ICEBERG) {
IcebergTable icebergTable = (IcebergTable) tbl;
OlapTable olapTable = icebergTable.getRelatedOlapTable();
if (olapTable != null) {
org.apache.iceberg.Table iTable = icebergTable.getIcebergTable();
icebergTable.updateRelatedOlapTablePartitions(sinkCommitAttachment.parseIcebergDataFiles(iTable));
}
}

// 6. update nextVersion because of the failure of persistent transaction resulting in error version
updateCatalogAfterCommitted(transactionState, db);
LOG.info("transaction:[{}] successfully committed", transactionState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.starrocks.common.UserException;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.load.loadv2.SinkCommitAttachment;
import com.starrocks.metric.MetricRepo;
import com.starrocks.task.PublishVersionTask;
import com.starrocks.thrift.TPartitionVersionInfo;
Expand Down Expand Up @@ -213,6 +214,7 @@ public String toString() {

// optional
private TxnCommitAttachment txnCommitAttachment;
private SinkCommitAttachment sinkCommitAttachment;

// this map should be set when load execution begin, so that when the txn commit, it will know
// which tables and rollups it loaded.
Expand Down Expand Up @@ -345,6 +347,10 @@ public TxnCommitAttachment getTxnCommitAttachment() {
return txnCommitAttachment;
}

public SinkCommitAttachment getSinkCommitAttachment() {
return sinkCommitAttachment;
}

public long getCallbackId() {
return callbackId;
}
Expand Down Expand Up @@ -511,6 +517,10 @@ public void setTxnCommitAttachment(TxnCommitAttachment txnCommitAttachment) {
this.txnCommitAttachment = txnCommitAttachment;
}

public void setSinkCommitAttachment(SinkCommitAttachment sinkCommitAttachment) {
this.sinkCommitAttachment = sinkCommitAttachment;
}

// return true if txn is in final status and label is expired
public boolean isExpired(long currentMillis) {
return transactionStatus.isFinalStatus() && (currentMillis - finishTime) / 1000 > Config.label_keep_max_second;
Expand Down

0 comments on commit e0440f4

Please sign in to comment.