Skip to content

Commit

Permalink
[Lake] support delete from lake table
Browse files Browse the repository at this point in the history
1. FE creates DeletePredicatePB which is used in BE tablet metadata directly.
2. No longer use the old push task api.
  • Loading branch information
wyb committed Aug 2, 2022
1 parent c6d468f commit 08df589
Show file tree
Hide file tree
Showing 23 changed files with 1,193 additions and 475 deletions.
1 change: 1 addition & 0 deletions be/src/gen_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(SRC_FILES
${GEN_CPP_DIR}/segment.pb.cc
${GEN_CPP_DIR}/persistent_index.pb.cc
${GEN_CPP_DIR}/tablet_schema.pb.cc
${GEN_CPP_DIR}/lake_delete.pb.cc
${GEN_CPP_DIR}/lake_types.pb.cc
${GEN_CPP_DIR}/lake_service.pb.cc
#$${GEN_CPP_DIR}/opcode/functions.cc
Expand Down
36 changes: 36 additions & 0 deletions be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,40 @@ void LakeServiceImpl::drop_table(::google::protobuf::RpcController* controller,
}
}

void LakeServiceImpl::delete_data(::google::protobuf::RpcController* controller,
const ::starrocks::lake::DeleteDataRequest* request,
::starrocks::lake::DeleteDataResponse* response, ::google::protobuf::Closure* done) {
brpc::ClosureGuard guard(done);
auto cntl = static_cast<brpc::Controller*>(controller);

if (request->tablet_ids_size() == 0) {
cntl->SetFailed("missing tablet_ids");
return;
}
if (!request->has_txn_id()) {
cntl->SetFailed("missing txn_id");
return;
}
if (!request->has_delete_predicate()) {
cntl->SetFailed("missing delete_predicate");
return;
}

for (auto tablet_id : request->tablet_ids()) {
auto tablet = _env->lake_tablet_manager()->get_tablet(tablet_id);
if (!tablet.ok()) {
LOG(WARNING) << "Fail to get tablet " << tablet_id << ": " << tablet.status();
response->add_failed_tablets(tablet_id);
continue;
}

auto res = tablet->delete_data(request->txn_id(), request->delete_predicate());
if (!res.ok()) {
LOG(WARNING) << "Fail to delete data. tablet_id: " << tablet_id << ", txn_id: " << request->txn_id()
<< ", error: " << res;
response->add_failed_tablets(tablet_id);
}
}
}

} // namespace starrocks
5 changes: 4 additions & 1 deletion be/src/service/service_be/lake_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ class LakeServiceImpl : public ::starrocks::lake::LakeService {
void drop_tablet(::google::protobuf::RpcController* controller, const ::starrocks::lake::DropTabletRequest* request,
::starrocks::lake::DropTabletResponse* response, ::google::protobuf::Closure* done) override;

void compact(::google::protobuf::RpcController* controller, const ::starrocks::lake::CompactRequest* rquest,
void compact(::google::protobuf::RpcController* controller, const ::starrocks::lake::CompactRequest* request,
::starrocks::lake::CompactResponse* response, ::google::protobuf::Closure* done) override;

void drop_table(::google::protobuf::RpcController* controller, const ::starrocks::lake::DropTableRequest* request,
::starrocks::lake::DropTableResponse* response, ::google::protobuf::Closure* done) override;

void delete_data(::google::protobuf::RpcController* controller, const ::starrocks::lake::DeleteDataRequest* request,
::starrocks::lake::DeleteDataResponse* response, ::google::protobuf::Closure* done) override;

private:
ExecEnv* _env;
};
Expand Down
13 changes: 13 additions & 0 deletions be/src/storage/lake/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,17 @@ std::string Tablet::root_location() const {
return _mgr->tablet_root_location(_id);
}

Status Tablet::delete_data(int64_t txn_id, const DeletePredicatePB& delete_predicate) {
auto txn_log = std::make_shared<lake::TxnLog>();
txn_log->set_tablet_id(_id);
txn_log->set_txn_id(txn_id);
auto op_write = txn_log->mutable_op_write();
auto rowset = op_write->mutable_rowset();
rowset->set_overlapped(false);
rowset->set_num_rows(0);
rowset->set_data_size(0);
rowset->mutable_delete_predicate()->CopyFrom(delete_predicate);
return put_txn_log(std::move(txn_log));
}

} // namespace starrocks::lake
3 changes: 3 additions & 0 deletions be/src/storage/lake/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <string_view>

#include "common/statusor.h"
#include "gen_cpp/lake_delete.pb.h"
#include "storage/lake/metadata_iterator.h"
#include "storage/lake/rowset.h"
#include "storage/lake/tablet_metadata.h"
Expand Down Expand Up @@ -80,6 +81,8 @@ class Tablet {

[[nodiscard]] std::string segment_location(std::string_view segment_name) const;

Status delete_data(int64_t txn_id, const DeletePredicatePB& delete_predicate);

private:
TabletManager* _mgr;
int64_t _id;
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,12 @@ under the License.
<arg value="--java_out=${basedir}/target/generated-sources/proto"/>
<arg value="${starrocks.home}/gensrc/proto/internal_service.proto"/>
</exec>
<exec executable="${java.home}/bin/java">
<arg value="-jar"/>
<arg value="${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar"/>
<arg value="--java_out=${basedir}/target/generated-sources/proto"/>
<arg value="${starrocks.home}/gensrc/proto/lake_delete.proto"/>
</exec>
<exec executable="${java.home}/bin/java">
<arg value="-jar"/>
<arg value="${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ public void analyze(Analyzer analyzer) throws UserException {

tblName.analyze(analyzer);

CatalogUtils.checkIsLakeTable(tblName.getDb(), tblName.getTbl());

if (partitionNames != null) {
partitionNames.analyze(analyzer);
if (partitionNames.isTemp()) {
Expand Down
11 changes: 9 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/lake/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -39,9 +40,15 @@ public static Long chooseBackend(LakeTablet tablet) {
// Preconditions: Has required the database's reader lock.
// Returns a map from backend ID to a list of tablet IDs.
public static Map<Long, List<Long>> groupTabletID(LakeTable table) throws NoAliveBackendException {
return groupTabletID(table.getPartitions(), MaterializedIndex.IndexExtState.ALL);
}

public static Map<Long, List<Long>> groupTabletID(Collection<Partition> partitions,
MaterializedIndex.IndexExtState indexState)
throws NoAliveBackendException {
Map<Long, List<Long>> groupMap = new HashMap<>();
for (Partition partition : table.getPartitions()) {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Partition partition : partitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(indexState)) {
for (Tablet tablet : index.getTablets()) {
Long beId = chooseBackend((LakeTablet) tablet);
if (beId == null) {
Expand Down
188 changes: 188 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/lake/delete/LakeDeleteJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Limited.

package com.starrocks.lake.delete;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.BinaryPredicate;
import com.starrocks.analysis.DeleteStmt;
import com.starrocks.analysis.InPredicate;
import com.starrocks.analysis.IsNullPredicate;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.Predicate;
import com.starrocks.analysis.SlotRef;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Table;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.lake.Utils;
import com.starrocks.lake.proto.BinaryPredicatePB;
import com.starrocks.lake.proto.DeleteDataRequest;
import com.starrocks.lake.proto.DeleteDataResponse;
import com.starrocks.lake.proto.DeletePredicatePB;
import com.starrocks.lake.proto.InPredicatePB;
import com.starrocks.lake.proto.IsNullPredicatePB;
import com.starrocks.load.DeleteHandler;
import com.starrocks.load.DeleteJob;
import com.starrocks.load.MultiDeleteInfo;
import com.starrocks.qe.QueryStateException;
import com.starrocks.rpc.LakeServiceClient;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TabletCommitInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* LakeDeleteJob is used to delete data for lake table.
* 1. Creates DeletePredicatePB which is used in BE tablet metadata directly.
* 2. No longer use the old push task api.
*/
public class LakeDeleteJob extends DeleteJob {
private static final Logger LOG = LogManager.getLogger(LakeDeleteJob.class);

private Map<Long, List<Long>> beToTablets;

public LakeDeleteJob(long id, long transactionId, String label, MultiDeleteInfo deleteInfo) {
super(id, transactionId, label, deleteInfo);
beToTablets = Maps.newHashMap();
}

@Override
@java.lang.SuppressWarnings("squid:S2142") // allow catch InterruptedException
public void run(DeleteStmt stmt, Database db, Table table, List<Partition> partitions)
throws DdlException, QueryStateException {
Preconditions.checkState(table.isLakeTable());

db.readLock();
try {
beToTablets = Utils.groupTabletID(partitions, MaterializedIndex.IndexExtState.VISIBLE);
} catch (Throwable t) {
LOG.warn("error occurred during delete process", t);
// if transaction has been begun, need to abort it
if (GlobalStateMgr.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), getTransactionId()) != null) {
cancel(DeleteHandler.CancelType.UNKNOWN, t.getMessage());
}
throw new DdlException(t.getMessage(), t);
} finally {
db.readUnlock();
}

// create delete predicate
List<Predicate> conditions = stmt.getDeleteConditions();
DeletePredicatePB deletePredicate = createDeletePredicate(conditions);

// send delete data request to BE
try {
List<Future<DeleteDataResponse>> responseList = Lists.newArrayListWithCapacity(
beToTablets.size());
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentSystemInfo();
for (Map.Entry<Long, List<Long>> entry : beToTablets.entrySet()) {
Backend backend = systemInfoService.getBackend(entry.getKey());
if (backend == null) {
throw new DdlException("Backend " + entry.getKey() + " has been dropped");
}
TNetworkAddress address = new TNetworkAddress();
address.setHostname(backend.getHost());
address.setPort(backend.getBrpcPort());

LakeServiceClient client = new LakeServiceClient(address);
DeleteDataRequest request = new DeleteDataRequest();
request.tabletIds = entry.getValue();
request.txnId = getTransactionId();
request.deletePredicate = deletePredicate;

Future<DeleteDataResponse> responseFuture = client.deleteData(request);
responseList.add(responseFuture);
}

for (Future<DeleteDataResponse> responseFuture : responseList) {
DeleteDataResponse response = responseFuture.get();
if (response != null && response.failedTablets != null && !response.failedTablets.isEmpty()) {
LOG.warn("Failed to execute delete. failed tablet: {}", response.failedTablets);
throw new DdlException("Failed to execute delete. failed tablet num: "
+ response.failedTablets.size());
}
}
} catch (RpcException | ExecutionException | InterruptedException e) {
cancel(DeleteHandler.CancelType.UNKNOWN, e.getMessage());
throw new DdlException(e.getMessage());
}

commit(db, getTimeoutMs());
}

private DeletePredicatePB createDeletePredicate(List<Predicate> conditions) {
DeletePredicatePB deletePredicate = new DeletePredicatePB();
deletePredicate.binaryPredicates = Lists.newArrayList();
deletePredicate.isNullPredicates = Lists.newArrayList();
deletePredicate.inPredicates = Lists.newArrayList();
for (Predicate condition : conditions) {
if (condition instanceof BinaryPredicate) {
BinaryPredicate binaryPredicate = (BinaryPredicate) condition;
BinaryPredicatePB binaryPredicatePB = new BinaryPredicatePB();
binaryPredicatePB.columnName = ((SlotRef) binaryPredicate.getChild(0)).getColumnName();
binaryPredicatePB.op = binaryPredicate.getOp().toString();
binaryPredicatePB.value = ((LiteralExpr) binaryPredicate.getChild(1)).getStringValue();
deletePredicate.binaryPredicates.add(binaryPredicatePB);
} else if (condition instanceof IsNullPredicate) {
IsNullPredicate isNullPredicate = (IsNullPredicate) condition;
IsNullPredicatePB isNullPredicatePB = new IsNullPredicatePB();
isNullPredicatePB.columnName = ((SlotRef) isNullPredicate.getChild(0)).getColumnName();
isNullPredicatePB.isNotNull = isNullPredicate.isNotNull();
deletePredicate.isNullPredicates.add(isNullPredicatePB);
} else if (condition instanceof InPredicate) {
InPredicate inPredicate = (InPredicate) condition;
InPredicatePB inPredicatePB = new InPredicatePB();
inPredicatePB.columnName = ((SlotRef) inPredicate.getChild(0)).getColumnName();
inPredicatePB.isNotIn = inPredicate.isNotIn();
inPredicatePB.values = Lists.newArrayList();
for (int i = 1; i <= inPredicate.getInElementNum(); i++) {
inPredicatePB.values.add(((LiteralExpr) inPredicate.getChild(i)).getStringValue());
}
deletePredicate.inPredicates.add(inPredicatePB);
}
}
return deletePredicate;
}

@Override
public long getTimeoutMs() {
long totalTablets = beToTablets.values().stream().flatMap(List::stream).count();
// timeout is between 30 seconds to 5 min
long timeout = Math.max(totalTablets * Config.tablet_delete_timeout_second * 1000L, 30000L);
return Math.min(timeout, Config.load_straggler_wait_second * 1000L);
}

@Override
public void clear() {
GlobalStateMgr.getCurrentState().getDeleteHandler().removeKillJob(getId());
}

@Override
public boolean commitImpl(Database db, long timeoutMs) throws UserException {
List<TabletCommitInfo> tabletCommitInfos = Lists.newArrayList();
for (Map.Entry<Long, List<Long>> entry : beToTablets.entrySet()) {
long backendId = entry.getKey();
for (long tabletId : entry.getValue()) {
tabletCommitInfos.add(new TabletCommitInfo(tabletId, backendId));
}
}

return GlobalStateMgr.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, getTransactionId(), tabletCommitInfos, timeoutMs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.load.DeleteJob;
import com.starrocks.load.OlapDeleteJob;
import com.starrocks.load.loadv2.SparkLoadJob;
import com.starrocks.persist.ReplicaPersistInfo;
import com.starrocks.rpc.FrontendServiceProxy;
Expand Down Expand Up @@ -448,13 +449,15 @@ private void finishRealtimePush(AgentTask task, TFinishTaskRequest request) {
if (deleteJob == null) {
throw new MetaNotFoundException("cannot find delete job, job[" + transactionId + "]");
}
Preconditions.checkState(deleteJob instanceof OlapDeleteJob);
OlapDeleteJob olapDeleteJob = (OlapDeleteJob) deleteJob;
for (int i = 0; i < tabletMetaList.size(); i++) {
TabletMeta tabletMeta = tabletMetaList.get(i);
long tabletId = tabletIds.get(i);
Replica replica = findRelatedReplica(olapTable, partition,
backendId, tabletId, tabletMeta.getIndexId());
if (replica != null) {
deleteJob.addFinishedReplica(partitionId, pushTabletId, replica);
olapDeleteJob.addFinishedReplica(partitionId, pushTabletId, replica);
pushTask.countDownLatch(backendId, pushTabletId);
}
}
Expand Down
Loading

0 comments on commit 08df589

Please sign in to comment.