Skip to content

Commit

Permalink
[Enhancement] Partial update support const expr (backport #50287) (#5…
Browse files Browse the repository at this point in the history
…0860)

Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev authored Sep 19, 2024
1 parent 95ed619 commit 2739a06
Show file tree
Hide file tree
Showing 30 changed files with 264 additions and 28 deletions.
14 changes: 14 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
if (column_param != nullptr) {
column_param->to_protobuf(pindex->mutable_column_param());
}
for (auto& [name, value] : column_to_expr_value) {
pindex->mutable_column_to_expr_value()->insert({name, value});
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand Down Expand Up @@ -121,6 +124,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
} else {
index->schema_id = p_index.id();
}

for (auto& entry : p_index.column_to_expr_value()) {
index->column_to_expr_value.insert({entry.first, entry.second});
}

_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -174,6 +182,12 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS
// schema id is same with index id in previous version, for compatibility
index->schema_id = t_index.id;
}

if (t_index.__isset.column_to_expr_value) {
for (auto& entry : t_index.column_to_expr_value) {
index->column_to_expr_value.insert({entry.first, entry.second});
}
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -49,6 +50,7 @@ struct OlapTableIndexSchema {
int32_t schema_hash;
OlapTableColumnParam* column_param;
ExprContext* where_clause = nullptr;
std::map<std::string, std::string> column_to_expr_value;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ class LakeTabletsChannel : public TabletsChannel {
bool _is_incremental_channel{false};

std::set<int64_t> _immutable_partition_ids;
std::map<string, string> _column_to_expr_value;

// Profile counters
// Number of tablets
Expand Down Expand Up @@ -241,6 +242,16 @@ Status LakeTabletsChannel::open(const PTabletWriterOpenRequest& params, PTabletW
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}

for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_create_delta_writers(params, false));

for (auto& [id, writer] : _delta_writers) {
Expand Down Expand Up @@ -590,6 +601,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_mem_tracker(_mem_tracker)
.set_schema_id(schema_id)
.set_partial_update_mode(params.partial_update_mode())
.set_column_to_expr_value(&_column_to_expr_value)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ Status LocalTabletsChannel::open(const PTabletWriterOpenRequest& params, PTablet
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}
for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_open_all_writers(params));

Expand Down Expand Up @@ -660,6 +668,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
options.write_quorum = params.write_quorum();
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down Expand Up @@ -857,6 +866,7 @@ Status LocalTabletsChannel::incremental_open(const PTabletWriterOpenRequest& par
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.immutable_tablet_size = params.immutable_tablet_size();
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/local_tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class LocalTabletsChannel : public TabletsChannel {

std::set<int64_t> _immutable_partition_ids;

std::map<string, string> _column_to_expr_value;

// Profile counters
// replicated_storage=false, the number of tablets
// replicated_storage=true, the number of primary tablets
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Status DeltaWriter::_init() {
writer_context.full_tablet_schema = _tablet_schema;
writer_context.is_partial_update = true;
writer_context.partial_update_mode = _opt.partial_update_mode;
writer_context.column_to_expr_value = _opt.column_to_expr_value;
_tablet_schema = partial_update_schema;
} else {
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && !_opt.merge_condition.empty()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct DeltaWriterOptions {
// If you need to access it after intialization, please make sure the pointer is valid.
const POlapTableSchemaParam* ptable_schema_param = nullptr;
int64_t immutable_tablet_size = 0;
std::map<string, string>* column_to_expr_value = nullptr;
};

enum State {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ StatusOr<AsyncDeltaWriterBuilder::AsyncDeltaWriterPtr> AsyncDeltaWriterBuilder::
.set_miss_auto_increment_column(_miss_auto_increment_column)
.set_schema_id(_schema_id)
.set_partial_update_mode(_partial_update_mode)
.set_column_to_expr_value(_column_to_expr_value)
.build());
auto impl = new AsyncDeltaWriterImpl(std::move(writer));
return std::make_unique<AsyncDeltaWriter>(impl);
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/async_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ class AsyncDeltaWriterBuilder {
return *this;
}

AsyncDeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<AsyncDeltaWriterPtr> build();

private:
Expand All @@ -195,6 +200,7 @@ class AsyncDeltaWriterBuilder {
std::string _merge_condition{};
bool _miss_auto_increment_column{false};
PartialUpdateMode _partial_update_mode{PartialUpdateMode::ROW_MODE};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
15 changes: 12 additions & 3 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class DeltaWriterImpl {
const std::vector<SlotDescriptor*>* slots, std::string merge_condition,
bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size,
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id,
const PartialUpdateMode& partial_update_mode)
const PartialUpdateMode& partial_update_mode,
const std::map<string, string>* column_to_expr_value)
: _tablet_manager(tablet_manager),
_tablet_id(tablet_id),
_txn_id(txn_id),
Expand All @@ -91,7 +92,8 @@ class DeltaWriterImpl {
_immutable_tablet_size(immutable_tablet_size),
_merge_condition(std::move(merge_condition)),
_miss_auto_increment_column(miss_auto_increment_column),
_partial_update_mode(partial_update_mode) {}
_partial_update_mode(partial_update_mode),
_column_to_expr_value(column_to_expr_value) {}

~DeltaWriterImpl() = default;

Expand Down Expand Up @@ -196,6 +198,8 @@ class DeltaWriterImpl {
bool _partial_schema_with_sort_key_conflict = false;

int64_t _last_write_ts = 0;

const std::map<string, string>* _column_to_expr_value = nullptr;
};

bool DeltaWriterImpl::is_immutable() const {
Expand Down Expand Up @@ -491,6 +495,11 @@ Status DeltaWriterImpl::finish(DeltaWriter::FinishMode mode) {
}
// handle partial update
op_write->mutable_txn_meta()->set_partial_update_mode(_partial_update_mode);
if (_column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_column_to_expr_value)) {
op_write->mutable_txn_meta()->mutable_column_to_expr_value()->insert({name, value});
}
}
}
// handle condition update
if (_merge_condition != "") {
Expand Down Expand Up @@ -749,7 +758,7 @@ StatusOr<DeltaWriterBuilder::DeltaWriterPtr> DeltaWriterBuilder::build() {
}
auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition,
_miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker,
_max_buffer_size, _schema_id, _partial_update_mode);
_max_buffer_size, _schema_id, _partial_update_mode, _column_to_expr_value);
return std::make_unique<DeltaWriter>(impl);
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ class DeltaWriterBuilder {
return *this;
}

DeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<DeltaWriterPtr> build();

private:
Expand All @@ -196,6 +201,7 @@ class DeltaWriterBuilder {
int64_t _max_buffer_size{0};
bool _miss_auto_increment_column{false};
PartialUpdateMode _partial_update_mode{PartialUpdateMode::ROW_MODE};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
1 change: 1 addition & 0 deletions be/src/storage/lake/pk_tablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class HorizontalPkTabletWriter : public HorizontalGeneralTabletWriter {
private:
std::unique_ptr<RowsetTxnMetaPB> _rowset_txn_meta;
std::unique_ptr<RowsMapperBuilder> _rows_mapper_builder;
const std::map<std::string, std::string>* _column_to_expr_value = nullptr;
};

class VerticalPkTabletWriter : public VerticalGeneralTabletWriter {
Expand Down
16 changes: 10 additions & 6 deletions be/src/storage/lake/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Status RowsetUpdateState::_prepare_auto_increment_partial_update_states(uint32_t
}

RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, column_id, new_rows > 0, rowids_by_rssid,
&read_column,
&read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

TRY_CATCH_BAD_ALLOC(_auto_increment_partial_update_states[segment_id].write_column->append_selective(
Expand Down Expand Up @@ -378,6 +378,10 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
CHECK_MEM_LIMIT("RowsetUpdateState::_prepare_partial_update_states");
std::vector<ColumnId> read_column_ids = get_read_columns_ids(params.op_write, params.tablet_schema);

const auto& txn_meta = params.op_write.txn_meta();
for (auto& entry : txn_meta.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
auto read_column_schema = ChunkHelper::convert_schema(params.tablet_schema, read_column_ids);
// column list that need to read from source segment
std::vector<std::unique_ptr<Column>> read_columns;
Expand All @@ -401,8 +405,8 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
plan_read_by_rssid(_partial_update_states[segment_id].src_rss_rowids, &num_default, &rowids_by_rssid, &idxes);
size_t total_rows = _partial_update_states[segment_id].src_rss_rowids.size();
// get column values by rowid, also get default values if needed
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));
for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
TRY_CATCH_BAD_ALLOC(_partial_update_states[segment_id].write_columns[col_idx]->append_selective(
*read_columns[col_idx], idxes.data(), 0, idxes.size()));
Expand Down Expand Up @@ -608,8 +612,8 @@ Status RowsetUpdateState::_resolve_conflict_partial_update(const RowsetUpdateSta
std::vector<uint32_t> read_idxes;
plan_read_by_rssid(conflict_rowids, &num_default, &rowids_by_rssid, &read_idxes);
DCHECK_EQ(conflict_idxes.size(), read_idxes.size());
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));

for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
std::unique_ptr<Column> new_write_column =
Expand Down Expand Up @@ -688,7 +692,7 @@ Status RowsetUpdateState::_resolve_conflict_auto_increment(const RowsetUpdateSta
auto_increment_read_column.resize(1);
auto_increment_read_column[0] = _auto_increment_partial_update_states[segment_id].write_column->clone_empty();
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column,
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

std::unique_ptr<Column> new_write_column =
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class RowsetUpdateState {
// to be destructed after segment iters
OlapReaderStatistics _stats;
std::vector<ChunkIteratorPtr> _segment_iters;
std::map<string, string> _column_to_expr_value;
};

inline std::ostream& operator<<(std::ostream& os, const RowsetUpdateState& o) {
Expand Down
17 changes: 13 additions & 4 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ Status UpdateManager::get_rowids_from_pkindex(int64_t tablet_id, int64_t base_ve
Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value,
AutoIncrementPartialUpdateState* auto_increment_state) {
TRACE_COUNTER_SCOPE_LATENCY_US("get_column_values_latency_us");
std::stringstream cost_str;
Expand All @@ -500,12 +501,20 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
if (with_default && auto_increment_state == nullptr) {
for (auto i = 0; i < column_ids.size(); ++i) {
const TabletColumn& tablet_column = params.tablet_schema->column(column_ids[i]);
if (tablet_column.has_default_value()) {
bool has_default_value = tablet_column.has_default_value();
std::string default_value = has_default_value ? tablet_column.default_value() : "";
if (column_to_expr_value != nullptr) {
auto iter = column_to_expr_value->find(std::string(tablet_column.name()));
if (iter != column_to_expr_value->end()) {
has_default_value = true;
default_value = iter->second;
}
}
if (has_default_value) {
const TypeInfoPtr& type_info = get_type_info(tablet_column);
std::unique_ptr<DefaultValueColumnIterator> default_value_iter =
std::make_unique<DefaultValueColumnIterator>(
tablet_column.has_default_value(), tablet_column.default_value(),
tablet_column.is_nullable(), type_info, tablet_column.length(), 1);
std::make_unique<DefaultValueColumnIterator>(true, default_value, tablet_column.is_nullable(),
type_info, tablet_column.length(), 1);
ColumnIteratorOptions iter_opts;
RETURN_IF_ERROR(default_value_iter->init(iter_opts));
RETURN_IF_ERROR(default_value_iter->fetch_values_by_rowid(nullptr, 1, (*columns)[i].get()));
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class UpdateManager {
Status get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value = nullptr,
AutoIncrementPartialUpdateState* auto_increment_state = nullptr);
// get delvec by version
Status get_del_vec(const TabletSegmentId& tsid, int64_t version, const MetaFileBuilder* builder, bool fill_cache,
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ StatusOr<RowsetSharedPtr> RowsetWriter::build() {
}
// set partial update mode
_rowset_txn_meta_pb->set_partial_update_mode(_context.partial_update_mode);
if (_context.column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_context.column_to_expr_value)) {
_rowset_txn_meta_pb->mutable_column_to_expr_value()->insert({name, value});
}
}
*_rowset_meta_pb->mutable_txn_meta() = *_rowset_txn_meta_pb;
} else if (!_context.merge_condition.empty()) {
_rowset_txn_meta_pb->set_merge_condition(_context.merge_condition);
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ class RowsetWriterContext {
bool is_pk_compaction = false;
// is compaction job
bool is_compaction = false;

std::map<string, string>* column_to_expr_value = nullptr;
};

} // namespace starrocks
Loading

0 comments on commit 2739a06

Please sign in to comment.