Skip to content

Commit

Permalink
[BugFix] fix column overflow when handle too large partial update (ba…
Browse files Browse the repository at this point in the history
…ckport #49054) (#49477)

Signed-off-by: Yixin Luo <[email protected]>
Co-authored-by: Yixin Luo <[email protected]>
  • Loading branch information
mergify[bot] and luohaha authored Aug 8, 2024
1 parent 2bf72d8 commit 1ea843e
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 38 deletions.
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ CONF_mInt64(update_compaction_result_bytes, "1073741824");
CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2");
// This config defines the maximum percentage of data allowed per compaction
CONF_mDouble(update_compaction_ratio_threshold, "0.5");
// This config controls max memory that we can use for partial update.
CONF_mInt64(partial_update_memory_limit_per_worker, "2147483648"); // 2GB

CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min
CONF_Int32(manual_compaction_threads, "4");
Expand Down
8 changes: 5 additions & 3 deletions be/src/storage/rowset/horizontal_update_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,15 @@ Status HorizontalUpdateRowsetWriter::add_chunk(const Chunk& chunk) {
RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position));
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += _update_file_writer->num_rows_written();
_num_uptfile++;
_total_update_row_size += static_cast<int64_t>(chunk.bytes_usage());
}
ASSIGN_OR_RETURN(_update_file_writer, _create_update_file_writer());
}
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += chunk.num_rows();
_total_update_row_size += static_cast<int64_t>(chunk.bytes_usage());
}

return _update_file_writer->append_chunk(chunk);
}
Expand Down Expand Up @@ -105,7 +108,6 @@ Status HorizontalUpdateRowsetWriter::flush() {
RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position));
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += _update_file_writer->num_rows_written();
_num_uptfile++;
}
_update_file_writer.reset();
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public BaseRowset {
size_t data_disk_size() const { return rowset_meta()->total_disk_size(); }
bool empty() const { return rowset_meta()->empty(); }
int64_t num_rows() const override { return rowset_meta()->num_rows(); }
int64_t num_rows_upt() const { return rowset_meta()->num_rows_upt(); }
size_t total_row_size() const { return rowset_meta()->total_row_size(); }
size_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); }
int64_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); }
Version version() const { return rowset_meta()->version(); }
RowsetId rowset_id() const override { return rowset_meta()->rowset_id(); }
std::string rowset_id_str() const { return rowset_meta()->rowset_id().to_string(); }
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class RowsetMeta {

int64_t num_rows() const { return _rowset_meta_pb->num_rows(); }

int64_t num_rows_upt() const { return _rowset_meta_pb->num_rows_upt(); }

void set_num_rows(int64_t num_rows) { _rowset_meta_pb->set_num_rows(num_rows); }

int64_t total_row_size() { return _rowset_meta_pb->total_row_size(); }
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ StatusOr<RowsetSharedPtr> RowsetWriter::build() {
_rowset_meta_pb->set_num_delete_files(_num_delfile);
_rowset_meta_pb->set_num_update_files(_num_uptfile);
_rowset_meta_pb->set_total_update_row_size(_total_update_row_size);
_rowset_meta_pb->set_num_rows_upt(_num_rows_upt);
if (_num_segment <= 1) {
_rowset_meta_pb->set_segments_overlap_pb(NONOVERLAPPING);
}
Expand Down
110 changes: 77 additions & 33 deletions be/src/storage/rowset_column_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,25 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R
return Status::OK();
}

static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path) {
static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) {
const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta();
const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size();
// `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version,
// then we will return zero and no limit.
if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0;
return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt);
}

// Read chunk from source segment file and call `update_func` to update it.
// `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk.
static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer)>& update_func) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment");
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path()));
// We need to estimate each update rows size before it has been actually updated.
const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset);
auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema());
if (!segment.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << segment.status();
Expand All @@ -322,11 +336,13 @@ static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema&
seg_options.version = version;
// not use delvec loader
seg_options.dcg_loader = std::make_shared<LocalDeltaColumnGroupLoader>(tablet->data_dir()->get_meta());
seg_options.chunk_size = config::vector_chunk_size;
ASSIGN_OR_RETURN(auto seg_iter, (*segment)->new_iterator(schema, seg_options));
ChunkUniquePtr source_chunk_ptr;
ChunkUniquePtr tmp_chunk_ptr;
TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, (*segment)->num_rows()));
TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, 1024));
TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size));
TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size));
uint32_t start_rowid = 0;
while (true) {
tmp_chunk_ptr->reset();
auto st = seg_iter->get_next(tmp_chunk_ptr.get());
Expand All @@ -336,9 +352,30 @@ static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema&
return st;
} else {
source_chunk_ptr->append(*tmp_chunk_ptr);
// Avoid too many memory usage and Column overflow, we will limit source chunk's size.
if (source_chunk_ptr->num_rows() >= INT32_MAX ||
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() >
config::partial_update_memory_limit_per_worker) {
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
}
}
return source_chunk_ptr;
if (!source_chunk_ptr->is_empty()) {
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
return Status::OK();
}

// this function build delta writer for delta column group's file.(end with `.col`)
Expand Down Expand Up @@ -382,7 +419,8 @@ static Status read_chunk_from_update_file(const ChunkIteratorPtr& iter, const Ch
// inorder_upt_rowids -> <2, 3, 4>, <5, 6>
static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
std::vector<std::vector<uint32_t>>* inorder_source_rowids,
std::vector<std::vector<uint32_t>>* inorder_upt_rowids) {
std::vector<std::vector<uint32_t>>* inorder_upt_rowids,
StreamChunkContainer container) {
uint32_t last_source_rowid = 0;
std::vector<uint32_t> current_source_rowids;
std::vector<uint32_t> current_upt_rowids;
Expand All @@ -393,11 +431,16 @@ static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
inorder_upt_rowids->back().swap(current_upt_rowids);
};
for (const auto& each : rowid_pairs) {
if (!container.contains(each.first)) {
// skip in this round
continue;
}
if (each.first < last_source_rowid) {
// cut
cut_rowids_fn();
}
current_source_rowids.push_back(each.first);
// Align rowid
current_source_rowids.push_back(each.first - container.start_rowid);
current_upt_rowids.push_back(each.second);
last_source_rowid = each.first;
}
Expand All @@ -410,7 +453,7 @@ static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs,
const Schema& partial_schema, Rowset* rowset,
OlapReaderStatistics* stats, MemTracker* tracker,
ChunkPtr* source_chunk) {
StreamChunkContainer container) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::_update_source_chunk_by_upt");
// handle upt files one by one
for (const auto& each : upt_id_to_rowid_pairs) {
Expand All @@ -431,13 +474,13 @@ Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPa
// 2. update source chunk
std::vector<std::vector<uint32_t>> inorder_source_rowids;
std::vector<std::vector<uint32_t>> inorder_upt_rowids;
cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids);
cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids, container);
DCHECK(inorder_source_rowids.size() == inorder_upt_rowids.size());
for (int i = 0; i < inorder_source_rowids.size(); i++) {
auto tmp_chunk = ChunkHelper::new_chunk(partial_schema, inorder_upt_rowids[i].size());
TRY_CATCH_BAD_ALLOC(tmp_chunk->append_selective(*upt_chunk, inorder_upt_rowids[i].data(), 0,
inorder_upt_rowids[i].size()));
RETURN_IF_EXCEPTION((*source_chunk)->update_rows(*tmp_chunk, inorder_source_rowids[i].data()));
RETURN_IF_EXCEPTION(container.chunk_ptr->update_rows(*tmp_chunk, inorder_source_rowids[i].data()));
}
}
return Status::OK();
Expand Down Expand Up @@ -685,8 +728,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
cost_str << " [generate delta column group writer] " << watch.elapsed_time();
watch.reset();
OlapReaderStatistics stats;
int64_t total_seek_source_segment_time = 0;
int64_t total_read_column_from_update_time = 0;
int64_t total_do_update_time = 0;
int64_t total_finalize_dcg_time = 0;
int64_t handle_cnt = 0;
// must record unique column id in delta column group
Expand All @@ -710,32 +752,35 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
// 3.2 build partial schema
auto partial_tschema = TabletSchema::create(tschema, selective_update_column_uids);
Schema partial_schema = ChunkHelper::convert_schema(tschema, selective_update_column_ids);
ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx));
// 3.3 read from source segment
ASSIGN_OR_RETURN(auto rowsetid_segid, _find_rowset_seg_id(each.first));
const std::string seg_path = Rowset::segment_file_path(
rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id);
ASSIGN_OR_RETURN(auto source_chunk_ptr,
read_from_source_segment(rowset, partial_schema, tablet, &stats,
latest_applied_version.major_number(), rowsetid_segid, seg_path));
const size_t source_chunk_size = source_chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
// 3.2 read from update segment
RETURN_IF_ERROR(read_from_source_segment_and_update(
rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid,
seg_path, [&](StreamChunkContainer container) {
VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: "
<< container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows()
<< "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")";
const size_t source_chunk_size = container.chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
// 3.4 read from update segment and do update
RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats,
tracker, container));
padding_char_columns(partial_schema, partial_tschema, container.chunk_ptr);
RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*container.chunk_ptr));
return Status::OK();
}));
int64_t t2 = MonotonicMillis();
RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats, tracker,
&source_chunk_ptr));
int64_t t3 = MonotonicMillis();
uint64_t segment_file_size = 0;
uint64_t index_size = 0;
uint64_t footer_position = 0;
padding_char_columns(partial_schema, partial_tschema, source_chunk_ptr.get());
ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx));
RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*source_chunk_ptr));
RETURN_IF_ERROR(delta_column_group_writer->finalize(&segment_file_size, &index_size, &footer_position));
int64_t t4 = MonotonicMillis();
total_seek_source_segment_time += t2 - t1;
total_read_column_from_update_time += t3 - t2;
total_finalize_dcg_time += t4 - t3;
int64_t t3 = MonotonicMillis();
total_do_update_time += t2 - t1;
total_finalize_dcg_time += t3 - t2;
// 3.6 prepare column id list and dcg file list
dcg_column_ids[each.first].push_back(selective_unique_update_column_ids);
dcg_column_files[each.first].push_back(file_name(delta_column_group_writer->segment_path()));
Expand All @@ -759,9 +804,8 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
cost_str << " [insert missing rows] " << watch.elapsed_time();
watch.reset();
}
cost_str << strings::Substitute(
" seek_source_segment(ms):$0 read_column_from_update(ms):$1 avg_finalize_dcg_time(ms):$2 ",
total_seek_source_segment_time, total_read_column_from_update_time, total_finalize_dcg_time);
cost_str << strings::Substitute(" total_do_update_time(ms):$0 total_finalize_dcg_time(ms):$1 ",
total_do_update_time, total_finalize_dcg_time);
cost_str << strings::Substitute(
"rss_cnt:$0 update_cnt:$1 column_cnt:$2 update_rows:$3 handle_cnt:$4 insert_rows:$5",
rss_upt_id_to_rowid_pairs.size(), _partial_update_states.size(), update_column_ids.size(), update_rows,
Expand Down
12 changes: 11 additions & 1 deletion be/src/storage/rowset_column_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ class Segment;
class RandomAccessFile;
class ColumnIterator;

struct StreamChunkContainer {
Chunk* chunk_ptr = nullptr;
// [start_rowid, end_rowid) is range of this chunk_ptr
uint32_t start_rowid = 0;
uint32_t end_rowid = 0;

// whether this container contains this rowid.
bool contains(uint32_t rowid) { return start_rowid <= rowid && rowid < end_rowid; }
};

struct RowsetSegmentStat {
int64_t num_rows_written = 0;
int64_t total_row_size = 0;
Expand Down Expand Up @@ -215,7 +225,7 @@ class RowsetColumnUpdateState {

Status _update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs, const Schema& partial_schema,
Rowset* rowset, OlapReaderStatistics* stats, MemTracker* tracker,
ChunkPtr* source_chunk);
StreamChunkContainer container);

private:
int64_t _tablet_id = 0;
Expand Down
Loading

0 comments on commit 1ea843e

Please sign in to comment.