Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Partial update support const expr #50287

Merged
merged 9 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ void OlapTableIndexSchema::to_protobuf(POlapTableIndexSchema* pindex) const {
if (column_param != nullptr) {
column_param->to_protobuf(pindex->mutable_column_param());
}
for (auto& [name, value] : column_to_expr_value) {
pindex->mutable_column_to_expr_value()->insert({name, value});
}
}

Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
Expand Down Expand Up @@ -120,6 +123,11 @@ Status OlapTableSchemaParam::init(const POlapTableSchemaParam& pschema) {
} else {
index->schema_id = p_index.id();
}

for (auto& entry : p_index.column_to_expr_value()) {
index->column_to_expr_value.insert({entry.first, entry.second});
}

_indexes.emplace_back(index);
}

Expand Down Expand Up @@ -173,6 +181,12 @@ Status OlapTableSchemaParam::init(const TOlapTableSchemaParam& tschema, RuntimeS
// schema id is same with index id in previous version, for compatibility
index->schema_id = t_index.id;
}

if (t_index.__isset.column_to_expr_value) {
for (auto& entry : t_index.column_to_expr_value) {
index->column_to_expr_value.insert({entry.first, entry.second});
}
}
_indexes.emplace_back(index);
}

Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <cstdint>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>

Expand Down Expand Up @@ -49,6 +50,7 @@ struct OlapTableIndexSchema {
int32_t schema_hash;
OlapTableColumnParam* column_param;
ExprContext* where_clause = nullptr;
std::map<std::string, std::string> column_to_expr_value;

void to_protobuf(POlapTableIndexSchema* pindex) const;
};
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ class LakeTabletsChannel : public TabletsChannel {
lake::DeltaWriterFinishMode _finish_mode{lake::DeltaWriterFinishMode::kWriteTxnLog};
TxnLogCollector _txn_log_collector;
std::set<int64_t> _immutable_partition_ids;
std::map<string, string> _column_to_expr_value;

// Profile counters
// Number of tablets
Expand Down Expand Up @@ -300,6 +301,16 @@ Status LakeTabletsChannel::open(const PTabletWriterOpenRequest& params, PTabletW
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}

for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_create_delta_writers(params, false));

for (auto& [id, writer] : _delta_writers) {
Expand Down Expand Up @@ -673,6 +684,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_mem_tracker(_mem_tracker)
.set_schema_id(schema_id)
.set_partial_update_mode(params.partial_update_mode())
.set_column_to_expr_value(&_column_to_expr_value)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());
Expand Down
10 changes: 10 additions & 0 deletions be/src/runtime/local_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ Status LocalTabletsChannel::open(const PTabletWriterOpenRequest& params, PTablet
_num_remaining_senders.store(params.num_senders(), std::memory_order_release);
_num_initial_senders.store(params.num_senders(), std::memory_order_release);
}
for (auto& index_schema : params.schema().indexes()) {
if (index_schema.id() != _index_id) {
continue;
}
for (auto& entry : index_schema.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
}

RETURN_IF_ERROR(_open_all_writers(params));

Expand Down Expand Up @@ -660,6 +668,7 @@ Status LocalTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& pa
options.write_quorum = params.write_quorum();
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down Expand Up @@ -857,6 +866,7 @@ Status LocalTabletsChannel::incremental_open(const PTabletWriterOpenRequest& par
options.miss_auto_increment_column = params.miss_auto_increment_column();
options.ptable_schema_param = &(params.schema());
options.immutable_tablet_size = params.immutable_tablet_size();
options.column_to_expr_value = &(_column_to_expr_value);
if (params.is_replicated_storage()) {
for (auto& replica : tablet.replicas()) {
options.replicas.emplace_back(replica);
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/local_tablets_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ class LocalTabletsChannel : public TabletsChannel {

std::set<int64_t> _immutable_partition_ids;

std::map<string, string> _column_to_expr_value;

// Profile counters
// replicated_storage=false, the number of tablets
// replicated_storage=true, the number of primary tablets
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ Status DeltaWriter::_init() {
writer_context.full_tablet_schema = _tablet_schema;
writer_context.is_partial_update = true;
writer_context.partial_update_mode = _opt.partial_update_mode;
writer_context.column_to_expr_value = _opt.column_to_expr_value;
_tablet_schema = partial_update_schema;
} else {
if (_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS && !_opt.merge_condition.empty()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ struct DeltaWriterOptions {
// If you need to access it after intialization, please make sure the pointer is valid.
const POlapTableSchemaParam* ptable_schema_param = nullptr;
int64_t immutable_tablet_size = 0;
std::map<string, string>* column_to_expr_value = nullptr;
};

enum State {
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ StatusOr<AsyncDeltaWriterBuilder::AsyncDeltaWriterPtr> AsyncDeltaWriterBuilder::
.set_miss_auto_increment_column(_miss_auto_increment_column)
.set_schema_id(_schema_id)
.set_partial_update_mode(_partial_update_mode)
.set_column_to_expr_value(_column_to_expr_value)
.build());
auto impl = new AsyncDeltaWriterImpl(std::move(writer));
return std::make_unique<AsyncDeltaWriter>(impl);
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/async_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ class AsyncDeltaWriterBuilder {
return *this;
}

AsyncDeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<AsyncDeltaWriterPtr> build();

private:
Expand All @@ -201,6 +206,7 @@ class AsyncDeltaWriterBuilder {
std::string _merge_condition{};
bool _miss_auto_increment_column{false};
PartialUpdateMode _partial_update_mode{PartialUpdateMode::ROW_MODE};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
15 changes: 12 additions & 3 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ class DeltaWriterImpl {
const std::vector<SlotDescriptor*>* slots, std::string merge_condition,
bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size,
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id,
const PartialUpdateMode& partial_update_mode)
const PartialUpdateMode& partial_update_mode,
const std::map<string, string>* column_to_expr_value)
: _tablet_manager(tablet_manager),
_tablet_id(tablet_id),
_txn_id(txn_id),
Expand All @@ -89,7 +90,8 @@ class DeltaWriterImpl {
_immutable_tablet_size(immutable_tablet_size),
_merge_condition(std::move(merge_condition)),
_miss_auto_increment_column(miss_auto_increment_column),
_partial_update_mode(partial_update_mode) {}
_partial_update_mode(partial_update_mode),
_column_to_expr_value(column_to_expr_value) {}

~DeltaWriterImpl() = default;

Expand Down Expand Up @@ -196,6 +198,8 @@ class DeltaWriterImpl {
bool _partial_schema_with_sort_key_conflict = false;

int64_t _last_write_ts = 0;

const std::map<string, string>* _column_to_expr_value = nullptr;
};

bool DeltaWriterImpl::is_immutable() const {
Expand Down Expand Up @@ -494,6 +498,11 @@ StatusOr<TxnLogPtr> DeltaWriterImpl::finish_with_txnlog(DeltaWriterFinishMode mo
}
// handle partial update
op_write->mutable_txn_meta()->set_partial_update_mode(_partial_update_mode);
if (_column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_column_to_expr_value)) {
op_write->mutable_txn_meta()->mutable_column_to_expr_value()->insert({name, value});
}
}
}
// handle condition update
if (_merge_condition != "") {
Expand Down Expand Up @@ -762,7 +771,7 @@ StatusOr<DeltaWriterBuilder::DeltaWriterPtr> DeltaWriterBuilder::build() {
}
auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition,
_miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker,
_max_buffer_size, _schema_id, _partial_update_mode);
_max_buffer_size, _schema_id, _partial_update_mode, _column_to_expr_value);
return std::make_unique<DeltaWriter>(impl);
}

sevev marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ class DeltaWriterBuilder {
return *this;
}

DeltaWriterBuilder& set_column_to_expr_value(const std::map<std::string, std::string>* column_to_expr_value) {
_column_to_expr_value = column_to_expr_value;
return *this;
}

StatusOr<DeltaWriterPtr> build();

private:
Expand All @@ -198,6 +203,7 @@ class DeltaWriterBuilder {
int64_t _max_buffer_size{0};
bool _miss_auto_increment_column{false};
PartialUpdateMode _partial_update_mode{PartialUpdateMode::ROW_MODE};
const std::map<std::string, std::string>* _column_to_expr_value{nullptr};
};

} // namespace starrocks::lake
1 change: 1 addition & 0 deletions be/src/storage/lake/pk_tablet_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class HorizontalPkTabletWriter : public HorizontalGeneralTabletWriter {
private:
std::unique_ptr<RowsetTxnMetaPB> _rowset_txn_meta;
std::unique_ptr<RowsMapperBuilder> _rows_mapper_builder;
const std::map<std::string, std::string>* _column_to_expr_value = nullptr;
};

class VerticalPkTabletWriter : public VerticalGeneralTabletWriter {
Expand Down
16 changes: 10 additions & 6 deletions be/src/storage/lake/rowset_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ Status RowsetUpdateState::_prepare_auto_increment_partial_update_states(uint32_t
}

RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, column_id, new_rows > 0, rowids_by_rssid,
&read_column,
&read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

TRY_CATCH_BAD_ALLOC(_auto_increment_partial_update_states[segment_id].write_column->append_selective(
Expand Down Expand Up @@ -378,6 +378,10 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
CHECK_MEM_LIMIT("RowsetUpdateState::_prepare_partial_update_states");
std::vector<ColumnId> read_column_ids = get_read_columns_ids(params.op_write, params.tablet_schema);

const auto& txn_meta = params.op_write.txn_meta();
for (auto& entry : txn_meta.column_to_expr_value()) {
_column_to_expr_value.insert({entry.first, entry.second});
}
auto read_column_schema = ChunkHelper::convert_schema(params.tablet_schema, read_column_ids);
// column list that need to read from source segment
std::vector<std::unique_ptr<Column>> read_columns;
Expand All @@ -401,8 +405,8 @@ Status RowsetUpdateState::_prepare_partial_update_states(uint32_t segment_id, co
plan_read_by_rssid(_partial_update_states[segment_id].src_rss_rowids, &num_default, &rowids_by_rssid, &idxes);
size_t total_rows = _partial_update_states[segment_id].src_rss_rowids.size();
// get column values by rowid, also get default values if needed
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));
for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
TRY_CATCH_BAD_ALLOC(_partial_update_states[segment_id].write_columns[col_idx]->append_selective(
*read_columns[col_idx], idxes.data(), 0, idxes.size()));
Expand Down Expand Up @@ -608,8 +612,8 @@ Status RowsetUpdateState::_resolve_conflict_partial_update(const RowsetUpdateSta
std::vector<uint32_t> read_idxes;
plan_read_by_rssid(conflict_rowids, &num_default, &rowids_by_rssid, &read_idxes);
DCHECK_EQ(conflict_idxes.size(), read_idxes.size());
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, read_column_ids, num_default > 0,
rowids_by_rssid, &read_columns));
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, read_column_ids, num_default > 0, rowids_by_rssid, &read_columns, &_column_to_expr_value));

for (size_t col_idx = 0; col_idx < read_column_ids.size(); col_idx++) {
std::unique_ptr<Column> new_write_column =
Expand Down Expand Up @@ -688,7 +692,7 @@ Status RowsetUpdateState::_resolve_conflict_auto_increment(const RowsetUpdateSta
auto_increment_read_column.resize(1);
auto_increment_read_column[0] = _auto_increment_partial_update_states[segment_id].write_column->clone_empty();
RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column,
params, column_id, new_rows > 0, rowids_by_rssid, &auto_increment_read_column, &_column_to_expr_value,
&_auto_increment_partial_update_states[segment_id]));

std::unique_ptr<Column> new_write_column =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The most risky bug in this code is:
Potentially missing initialization or improper handling of _column_to_expr_value

You can modify the code like this:

@@ -328,6 +328,7 @@ Status RowsetUpdateState::_prepare_auto_increment_partial_update_states(uint32_t
     }
 
+    _column_to_expr_value.clear();  // Ensure it's empty before usage
     RETURN_IF_ERROR(params.tablet->update_mgr()->get_column_values(params, column_id, new_rows > 0, rowids_by_rssid,
                                                                    &read_column, &_column_to_expr_value,
                                                                    &_auto_increment_partial_update_states[segment_id]));

Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/rowset_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ class RowsetUpdateState {
// to be destructed after segment iters
OlapReaderStatistics _stats;
std::vector<ChunkIteratorPtr> _segment_iters;
std::map<string, string> _column_to_expr_value;
};

inline std::ostream& operator<<(std::ostream& os, const RowsetUpdateState& o) {
Expand Down
17 changes: 13 additions & 4 deletions be/src/storage/lake/update_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ Status UpdateManager::get_rowids_from_pkindex(int64_t tablet_id, int64_t base_ve
Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value,
AutoIncrementPartialUpdateState* auto_increment_state) {
TRACE_COUNTER_SCOPE_LATENCY_US("get_column_values_latency_us");
std::stringstream cost_str;
Expand All @@ -500,12 +501,20 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
if (with_default && auto_increment_state == nullptr) {
for (auto i = 0; i < column_ids.size(); ++i) {
const TabletColumn& tablet_column = params.tablet_schema->column(column_ids[i]);
if (tablet_column.has_default_value()) {
bool has_default_value = tablet_column.has_default_value();
std::string default_value = has_default_value ? tablet_column.default_value() : "";
if (column_to_expr_value != nullptr) {
auto iter = column_to_expr_value->find(std::string(tablet_column.name()));
if (iter != column_to_expr_value->end()) {
has_default_value = true;
default_value = iter->second;
}
}
if (has_default_value) {
const TypeInfoPtr& type_info = get_type_info(tablet_column);
std::unique_ptr<DefaultValueColumnIterator> default_value_iter =
std::make_unique<DefaultValueColumnIterator>(
tablet_column.has_default_value(), tablet_column.default_value(),
tablet_column.is_nullable(), type_info, tablet_column.length(), 1);
std::make_unique<DefaultValueColumnIterator>(true, default_value, tablet_column.is_nullable(),
type_info, tablet_column.length(), 1);
ColumnIteratorOptions iter_opts;
RETURN_IF_ERROR(default_value_iter->init(iter_opts));
RETURN_IF_ERROR(default_value_iter->fetch_values_by_rowid(nullptr, 1, (*columns)[i].get()));
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/lake/update_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class UpdateManager {
Status get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
vector<std::unique_ptr<Column>>* columns,
const std::map<string, string>* column_to_expr_value = nullptr,
AutoIncrementPartialUpdateState* auto_increment_state = nullptr);
// get delvec by version
Status get_del_vec(const TabletSegmentId& tsid, int64_t version, const MetaFileBuilder* builder, bool fill_cache,
Expand Down
6 changes: 6 additions & 0 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,12 @@ StatusOr<RowsetSharedPtr> RowsetWriter::build() {
}
// set partial update mode
_rowset_txn_meta_pb->set_partial_update_mode(_context.partial_update_mode);
if (_context.column_to_expr_value != nullptr) {
for (auto& [name, value] : (*_context.column_to_expr_value)) {
LOG(INFO) << "rowset writer set column";
sevev marked this conversation as resolved.
Show resolved Hide resolved
_rowset_txn_meta_pb->mutable_column_to_expr_value()->insert({name, value});
}
}
*_rowset_meta_pb->mutable_txn_meta() = *_rowset_txn_meta_pb;
} else if (!_context.merge_condition.empty()) {
_rowset_txn_meta_pb->set_merge_condition(_context.merge_condition);
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class RowsetWriterContext {
bool is_pk_compaction = false;
// is compaction job
bool is_compaction = false;

std::map<string, string>* column_to_expr_value = nullptr;
};

} // namespace starrocks
Loading
Loading