Skip to content

Commit

Permalink
[Enhancement] Partial update support const expr (backport #50287) (#5…
Browse files Browse the repository at this point in the history
…0861)

Signed-off-by: sevev <[email protected]>
Co-authored-by: zhangqiang <[email protected]>
  • Loading branch information
mergify[bot] and sevev authored Sep 27, 2024
1 parent 00b4eac commit aef8019
Show file tree
Hide file tree
Showing 30 changed files with 263 additions and 28 deletions.
14 changes: 14 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
if (column_param != nullptr) {
column_param->to_protobuf(pindex->mutable_column_param());
}
for (auto& [name, value] : column_to_expr_value) {
pindex->mutable_column_to_expr_value()->insert({name, value});
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand Down Expand Up @@ -120,6 +123,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
} else {
index->schema_id = p_index.id();
}

for (auto& entry : p_index.column_to_expr_value()) {
index->column_to_expr_value.insert({entry.first, entry.second});
}

_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -173,6 +181,12 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS
// schema id is same with index id in previous version, for compatibility
index->schema_id = t_index.id;
}

if (t_index.__isset.column_to_expr_value) {
for (auto& entry : t_index.column_to_expr_value) {
index->column_to_expr_value.insert({entry.first, entry.second});
}
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -49,6 +50,7 @@ struct OlapTableIndexSchema {
int32_t schema_hash;
OlapTableColumnParam* column_param;
ExprContext* where_clause = nullptr;
std::map<std::string, std::string> column_to_expr_value;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ class LakeTabletsChannel : public TabletsChannel {
bool _is_incremental_channel{false};

std::set<int64_t> _immutable_partition_ids;
std::map<string, string> _column_to_expr_value;
};

LakeTabletsChannel::LakeTabletsChannel(LoadChannel* load_channel, lake::TabletManager* tablet_manager,
Expand Down Expand Up @@ -207,6 +208,16 @@ Status LakeTabletsChannel::open(const PTabletWriterOpenRequest& params, PTabletW
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}

for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_create_delta_writers(params, false));

for (auto& [id, writer] : _delta_writers) {
Expand Down Expand Up @@ -544,6 +555,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_immutable_tablet_size(params.immutable_tablet_size())
.set_mem_tracker(_mem_tracker)
.set_schema_id(schema_id)
.set_column_to_expr_value(&_column_to_expr_value)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ Status LocalTabletsChannel::open(const PTabletWriterOpenRequest& params, PTablet
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}
for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_open_all_writers(params));

Expand Down Expand Up @@ -633,6 +641,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
options.write_quorum = params.write_quorum();
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down Expand Up @@ -824,6 +833,7 @@ Status LocalTabletsChannel::incremental_open(const PTabletWriterOpenRequest& par
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.immutable_tablet_size = params.immutable_tablet_size();
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/local_tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class LocalTabletsChannel : public TabletsChannel {
Status _status = Status::OK();

std::set<int64_t> _immutable_partition_ids;
std::map<string, string> _column_to_expr_value;
};

std::shared_ptr<TabletsChannel> new_local_tablets_channel(LoadChannel* load_channel, const TabletsChannelKey& key,
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Status DeltaWriter::_init() {
writer_context.full_tablet_schema = _tablet_schema;
writer_context.is_partial_update = true;
writer_context.partial_update_mode = _opt.partial_update_mode;
writer_context.column_to_expr_value = _opt.column_to_expr_value;
_tablet_schema = partial_update_schema;
} else {
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && !_opt.merge_condition.empty()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct DeltaWriterOptions {
// If you need to access it after intialization, please make sure the pointer is valid.
const POlapTableSchemaParam* ptable_schema_param = nullptr;
int64_t immutable_tablet_size = 0;
std::map<string, string>* column_to_expr_value = nullptr;
};

enum State {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ StatusOr<AsyncDeltaWriterBuilder::AsyncDeltaWriterPtr> AsyncDeltaWriterBuilder::
.set_immutable_tablet_size(_immutable_tablet_size)
.set_miss_auto_increment_column(_miss_auto_increment_column)
.set_schema_id(_schema_id)
.set_column_to_expr_value(_column_to_expr_value)
.build());
auto impl = new AsyncDeltaWriterImpl(std::move(writer));
return std::make_unique<AsyncDeltaWriter>(impl);
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/async_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class AsyncDeltaWriterBuilder {
return *this;
}

AsyncDeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<AsyncDeltaWriterPtr> build();

private:
Expand All @@ -188,6 +193,7 @@ class AsyncDeltaWriterBuilder {
MemTracker* _mem_tracker{nullptr};
std::string _merge_condition{};
bool _miss_auto_increment_column{false};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
16 changes: 13 additions & 3 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class DeltaWriterImpl {
explicit DeltaWriterImpl(TabletManager* tablet_manager, int64_t tablet_id, int64_t txn_id, int64_t partition_id,
const std::vector<SlotDescriptor*>* slots, std::string merge_condition,
bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size,
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id)
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id,
const std::map<string, string>* column_to_expr_value)
: _tablet_manager(tablet_manager),
_tablet_id(tablet_id),
_txn_id(txn_id),
Expand All @@ -88,7 +89,8 @@ class DeltaWriterImpl {
_max_buffer_size(max_buffer_size > 0 ? max_buffer_size : config::write_buffer_size),
_immutable_tablet_size(immutable_tablet_size),
_merge_condition(std::move(merge_condition)),
_miss_auto_increment_column(miss_auto_increment_column) {}
_miss_auto_increment_column(miss_auto_increment_column),
_column_to_expr_value(column_to_expr_value) {}

~DeltaWriterImpl() = default;

Expand Down Expand Up @@ -189,6 +191,8 @@ class DeltaWriterImpl {
bool _partial_schema_with_sort_key = false;

int64_t _last_write_ts = 0;

const std::map<string, string>* _column_to_expr_value = nullptr;
};

bool DeltaWriterImpl::is_immutable() const {
Expand Down Expand Up @@ -461,6 +465,12 @@ Status DeltaWriterImpl::finish(DeltaWriter::FinishMode mode) {
for (auto i = 0; i < op_write->rowset().segments_size(); i++) {
op_write->add_rewrite_segments(gen_segment_filename(_txn_id));
}
// handle partial update
if (_column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_column_to_expr_value)) {
op_write->mutable_txn_meta()->mutable_column_to_expr_value()->insert({name, value});
}
}
}
// handle condition update
if (_merge_condition != "") {
Expand Down Expand Up @@ -719,7 +729,7 @@ StatusOr<DeltaWriterBuilder::DeltaWriterPtr> DeltaWriterBuilder::build() {
}
auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition,
_miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker,
_max_buffer_size, _schema_id);
_max_buffer_size, _schema_id, _column_to_expr_value);
return std::make_unique<DeltaWriter>(impl);
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class DeltaWriterBuilder {
return *this;
}

DeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<DeltaWriterPtr> build();

private:
Expand All @@ -189,6 +194,7 @@ class DeltaWriterBuilder {
MemTracker* _mem_tracker{nullptr};
int64_t _max_buffer_size{0};
bool _miss_auto_increment_column{false};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
1 change: 1 addition & 0 deletions be/src/storage/lake/pk_tablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class HorizontalPkTabletWriter : public HorizontalGeneralTabletWriter {
private:
std::unique_ptr<RowsetTxnMetaPB> _rowset_txn_meta;
std::unique_ptr<RowsMapperBuilder> _rows_mapper_builder;
const std::map<std::string, std::string>* _column_to_expr_value = nullptr;
};

class VerticalPkTabletWriter : public VerticalGeneralTabletWriter {
Expand Down
16 changes: 10 additions & 6 deletions be/src/storage/lake/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ Status RowsetUpdateState::_prepare_auto_increment_partial_update_states(uint32_t
}

RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, column_id, new_rows > 0, rowids_by_rssid,
&read_column,
&read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

TRY_CATCH_BAD_ALLOC(_auto_increment_partial_update_states[segment_id].write_column->append_selective(
Expand Down Expand Up @@ -355,6 +355,10 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
CHECK_MEM_LIMIT("RowsetUpdateState::_prepare_partial_update_states");
std::vector<ColumnId> read_column_ids = get_read_columns_ids(params.op_write, params.tablet_schema);

const auto& txn_meta = params.op_write.txn_meta();
for (auto& entry : txn_meta.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
auto read_column_schema = ChunkHelper::convert_schema(params.tablet_schema, read_column_ids);
// column list that need to read from source segment
std::vector<std::unique_ptr<Column>> read_columns;
Expand All @@ -378,8 +382,8 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
plan_read_by_rssid(_partial_update_states[segment_id].src_rss_rowids, &num_default, &rowids_by_rssid, &idxes);
size_t total_rows = _partial_update_states[segment_id].src_rss_rowids.size();
// get column values by rowid, also get default values if needed
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));
for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
TRY_CATCH_BAD_ALLOC(_partial_update_states[segment_id].write_columns[col_idx]->append_selective(
*read_columns[col_idx], idxes.data(), 0, idxes.size()));
Expand Down Expand Up @@ -572,8 +576,8 @@ Status RowsetUpdateState::_resolve_conflict_partial_update(const RowsetUpdateSta
std::vector<uint32_t> read_idxes;
plan_read_by_rssid(conflict_rowids, &num_default, &rowids_by_rssid, &read_idxes);
DCHECK_EQ(conflict_idxes.size(), read_idxes.size());
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));

for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
std::unique_ptr<Column> new_write_column =
Expand Down Expand Up @@ -652,7 +656,7 @@ Status RowsetUpdateState::_resolve_conflict_auto_increment(const RowsetUpdateSta
auto_increment_read_column.resize(1);
auto_increment_read_column[0] = _auto_increment_partial_update_states[segment_id].write_column->clone_empty();
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column,
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

std::unique_ptr<Column> new_write_column =
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class RowsetUpdateState {
// to be destructed after segment iters
OlapReaderStatistics _stats;
std::vector<ChunkIteratorPtr> _segment_iters;
std::map<std::string, std::string> _column_to_expr_value;
};

inline std::ostream& operator<<(std::ostream& os, const RowsetUpdateState& o) {
Expand Down
17 changes: 13 additions & 4 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ Status UpdateManager::get_rowids_from_pkindex(int64_t tablet_id, int64_t base_ve
Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value,
AutoIncrementPartialUpdateState* auto_increment_state) {
TRACE_COUNTER_SCOPE_LATENCY_US("get_column_values_latency_us");
std::stringstream cost_str;
Expand All @@ -454,12 +455,20 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
if (with_default && auto_increment_state == nullptr) {
for (auto i = 0; i < column_ids.size(); ++i) {
const TabletColumn& tablet_column = params.tablet_schema->column(column_ids[i]);
if (tablet_column.has_default_value()) {
bool has_default_value = tablet_column.has_default_value();
std::string default_value = has_default_value ? tablet_column.default_value() : "";
if (column_to_expr_value != nullptr) {
auto iter = column_to_expr_value->find(std::string(tablet_column.name()));
if (iter != column_to_expr_value->end()) {
has_default_value = true;
default_value = iter->second;
}
}
if (has_default_value) {
const TypeInfoPtr& type_info = get_type_info(tablet_column);
std::unique_ptr<DefaultValueColumnIterator> default_value_iter =
std::make_unique<DefaultValueColumnIterator>(
tablet_column.has_default_value(), tablet_column.default_value(),
tablet_column.is_nullable(), type_info, tablet_column.length(), 1);
std::make_unique<DefaultValueColumnIterator>(true, default_value, tablet_column.is_nullable(),
type_info, tablet_column.length(), 1);
ColumnIteratorOptions iter_opts;
RETURN_IF_ERROR(default_value_iter->init(iter_opts));
RETURN_IF_ERROR(default_value_iter->fetch_values_by_rowid(nullptr, 1, (*columns)[i].get()));
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class UpdateManager {
Status get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value = nullptr,
AutoIncrementPartialUpdateState* auto_increment_state = nullptr);
// get delvec by version
Status get_del_vec(const TabletSegmentId& tsid, int64_t version, const MetaFileBuilder* builder, bool fill_cache,
Expand Down
5 changes: 5 additions & 0 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,11 @@ StatusOr<RowsetSharedPtr> RowsetWriter::build() {
}
// set partial update mode
_rowset_txn_meta_pb->set_partial_update_mode(_context.partial_update_mode);
if (_context.column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_context.column_to_expr_value)) {
_rowset_txn_meta_pb->mutable_column_to_expr_value()->insert({name, value});
}
}
*_rowset_meta_pb->mutable_txn_meta() = *_rowset_txn_meta_pb;
} else if (!_context.merge_condition.empty()) {
_rowset_txn_meta_pb->set_merge_condition(_context.merge_condition);
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class RowsetWriterContext {
PartialUpdateMode partial_update_mode = PartialUpdateMode::UNKNOWN_MODE;
// Is pk compaction output writer
bool is_pk_compaction = false;
std::map<string, string>* column_to_expr_value = nullptr;
};

} // namespace starrocks
Loading

0 comments on commit aef8019

Please sign in to comment.