Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix column overflow when handle too large partial update #49054

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ CONF_mInt64(update_compaction_result_bytes, "1073741824");
CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2");
// This config defines the maximum percentage of data allowed per compaction
CONF_mDouble(update_compaction_ratio_threshold, "0.5");
// This config controls max memory that we can use for partial update.
CONF_mInt64(partial_update_memory_limit_per_worker, "2147483648"); // 2GB

CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min
CONF_Int32(manual_compaction_threads, "4");
Expand Down
3 changes: 1 addition & 2 deletions be/src/storage/rowset/horizontal_update_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Status HorizontalUpdateRowsetWriter::add_chunk(const Chunk& chunk) {
RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position));
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += _update_file_writer->num_rows_written();
_num_rows_upt += chunk.num_rows();
luohaha marked this conversation as resolved.
Show resolved Hide resolved
_num_uptfile++;
_total_update_row_size += static_cast<int64_t>(chunk.bytes_usage());
}
Expand Down Expand Up @@ -105,7 +105,6 @@ Status HorizontalUpdateRowsetWriter::flush() {
RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position));
{
std::lock_guard<std::mutex> l(_lock);
_num_rows_upt += _update_file_writer->num_rows_written();
_num_uptfile++;
}
_update_file_writer.reset();
Expand Down
3 changes: 2 additions & 1 deletion be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,9 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public BaseRowset {
size_t data_disk_size() const { return rowset_meta()->total_disk_size(); }
bool empty() const { return rowset_meta()->empty(); }
int64_t num_rows() const override { return rowset_meta()->num_rows(); }
int64_t num_rows_upt() const { return rowset_meta()->num_rows_upt(); }
size_t total_row_size() const { return rowset_meta()->total_row_size(); }
size_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); }
int64_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); }
Version version() const { return rowset_meta()->version(); }
RowsetId rowset_id() const override { return rowset_meta()->rowset_id(); }
std::string rowset_id_str() const { return rowset_meta()->rowset_id().to_string(); }
Expand Down
2 changes: 2 additions & 0 deletions be/src/storage/rowset/rowset_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class RowsetMeta {

int64_t num_rows() const { return _rowset_meta_pb->num_rows(); }

int64_t num_rows_upt() const { return _rowset_meta_pb->num_rows_upt(); }

void set_num_rows(int64_t num_rows) { _rowset_meta_pb->set_num_rows(num_rows); }

int64_t total_row_size() { return _rowset_meta_pb->total_row_size(); }
Expand Down
1 change: 1 addition & 0 deletions be/src/storage/rowset/rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ StatusOr<RowsetSharedPtr> RowsetWriter::build() {
_rowset_meta_pb->set_num_delete_files(_num_delfile);
_rowset_meta_pb->set_num_update_files(_num_uptfile);
_rowset_meta_pb->set_total_update_row_size(_total_update_row_size);
_rowset_meta_pb->set_num_rows_upt(_num_rows_upt);
if (_num_segment <= 1) {
_rowset_meta_pb->set_segments_overlap_pb(NONOVERLAPPING);
}
Expand Down
110 changes: 77 additions & 33 deletions be/src/storage/rowset_column_update_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,25 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R
return Status::OK();
}

static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path) {
static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) {
const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta();
const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size();
// `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version,
// then we will return zero and no limit.
if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0;
luohaha marked this conversation as resolved.
Show resolved Hide resolved
return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt);
}

// Read chunk from source segment file and call `update_func` to update it.
// `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk.
static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet,
OlapReaderStatistics* stats, int64_t version,
RowsetSegmentId rowset_seg_id, const std::string& path,
const std::function<Status(StreamChunkContainer)>& update_func) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment");
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path()));
// We need to estimate each update rows size before it has been actually updated.
const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset);
auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema());
if (!segment.ok()) {
LOG(WARNING) << "Fail to open " << path << ": " << segment.status();
Expand All @@ -322,11 +336,13 @@ static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema&
seg_options.version = version;
// not use delvec loader
seg_options.dcg_loader = std::make_shared<LocalDeltaColumnGroupLoader>(tablet->data_dir()->get_meta());
seg_options.chunk_size = config::vector_chunk_size;
ASSIGN_OR_RETURN(auto seg_iter, (*segment)->new_iterator(schema, seg_options));
ChunkUniquePtr source_chunk_ptr;
ChunkUniquePtr tmp_chunk_ptr;
TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, (*segment)->num_rows()));
TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, 1024));
TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size));
TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size));
uint32_t start_rowid = 0;
while (true) {
tmp_chunk_ptr->reset();
auto st = seg_iter->get_next(tmp_chunk_ptr.get());
Expand All @@ -336,9 +352,30 @@ static StatusOr<ChunkPtr> read_from_source_segment(Rowset* rowset, const Schema&
return st;
} else {
source_chunk_ptr->append(*tmp_chunk_ptr);
// Avoid too many memory usage and Column overflow, we will limit source chunk's size.
if (source_chunk_ptr->num_rows() >= INT32_MAX ||
(int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() >
config::partial_update_memory_limit_per_worker) {
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
}
}
return source_chunk_ptr;
if (!source_chunk_ptr->is_empty()) {
StreamChunkContainer container = {
.chunk_ptr = source_chunk_ptr.get(),
.start_rowid = start_rowid,
.end_rowid = start_rowid + static_cast<uint32_t>(source_chunk_ptr->num_rows())};
RETURN_IF_ERROR(update_func(container));
start_rowid += static_cast<uint32_t>(source_chunk_ptr->num_rows());
source_chunk_ptr->reset();
}
return Status::OK();
}

// this function build delta writer for delta column group's file.(end with `.col`)
Expand Down Expand Up @@ -382,7 +419,8 @@ static Status read_chunk_from_update_file(const ChunkIteratorPtr& iter, const Ch
// inorder_upt_rowids -> <2, 3, 4>, <5, 6>
static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
std::vector<std::vector<uint32_t>>* inorder_source_rowids,
std::vector<std::vector<uint32_t>>* inorder_upt_rowids) {
std::vector<std::vector<uint32_t>>* inorder_upt_rowids,
StreamChunkContainer container) {
uint32_t last_source_rowid = 0;
std::vector<uint32_t> current_source_rowids;
std::vector<uint32_t> current_upt_rowids;
Expand All @@ -393,11 +431,16 @@ static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
inorder_upt_rowids->back().swap(current_upt_rowids);
};
for (const auto& each : rowid_pairs) {
if (!container.contains(each.first)) {
// skip in this round
continue;
}
if (each.first < last_source_rowid) {
// cut
cut_rowids_fn();
}
current_source_rowids.push_back(each.first);
// Align rowid
current_source_rowids.push_back(each.first - container.start_rowid);
current_upt_rowids.push_back(each.second);
last_source_rowid = each.first;
}
Expand All @@ -410,7 +453,7 @@ static void cut_rowids_in_order(const std::vector<RowidPairs>& rowid_pairs,
Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs,
const Schema& partial_schema, Rowset* rowset,
OlapReaderStatistics* stats, MemTracker* tracker,
ChunkPtr* source_chunk) {
StreamChunkContainer container) {
CHECK_MEM_LIMIT("RowsetColumnUpdateState::_update_source_chunk_by_upt");
// handle upt files one by one
for (const auto& each : upt_id_to_rowid_pairs) {
Expand All @@ -431,13 +474,13 @@ Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPa
// 2. update source chunk
std::vector<std::vector<uint32_t>> inorder_source_rowids;
std::vector<std::vector<uint32_t>> inorder_upt_rowids;
cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids);
cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids, container);
DCHECK(inorder_source_rowids.size() == inorder_upt_rowids.size());
for (int i = 0; i < inorder_source_rowids.size(); i++) {
auto tmp_chunk = ChunkHelper::new_chunk(partial_schema, inorder_upt_rowids[i].size());
TRY_CATCH_BAD_ALLOC(tmp_chunk->append_selective(*upt_chunk, inorder_upt_rowids[i].data(), 0,
inorder_upt_rowids[i].size()));
RETURN_IF_EXCEPTION((*source_chunk)->update_rows(*tmp_chunk, inorder_source_rowids[i].data()));
RETURN_IF_EXCEPTION(container.chunk_ptr->update_rows(*tmp_chunk, inorder_source_rowids[i].data()));
}
}
return Status::OK();
Expand Down Expand Up @@ -685,8 +728,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
cost_str << " [generate delta column group writer] " << watch.elapsed_time();
watch.reset();
OlapReaderStatistics stats;
int64_t total_seek_source_segment_time = 0;
int64_t total_read_column_from_update_time = 0;
int64_t total_do_update_time = 0;
int64_t total_finalize_dcg_time = 0;
int64_t handle_cnt = 0;
// must record unique column id in delta column group
Expand All @@ -710,32 +752,35 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
// 3.2 build partial schema
auto partial_tschema = TabletSchema::create(tschema, selective_update_column_uids);
Schema partial_schema = ChunkHelper::convert_schema(tschema, selective_update_column_ids);
ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx));
// 3.3 read from source segment
ASSIGN_OR_RETURN(auto rowsetid_segid, _find_rowset_seg_id(each.first));
const std::string seg_path = Rowset::segment_file_path(
rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id);
ASSIGN_OR_RETURN(auto source_chunk_ptr,
read_from_source_segment(rowset, partial_schema, tablet, &stats,
latest_applied_version.major_number(), rowsetid_segid, seg_path));
const size_t source_chunk_size = source_chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
// 3.2 read from update segment
RETURN_IF_ERROR(read_from_source_segment_and_update(
rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid,
seg_path, [&](StreamChunkContainer container) {
VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: "
<< container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows()
<< "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")";
const size_t source_chunk_size = container.chunk_ptr->memory_usage();
tracker->consume(source_chunk_size);
DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); });
// 3.4 read from update segment and do update
RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats,
tracker, container));
padding_char_columns(partial_schema, partial_tschema, container.chunk_ptr);
RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*container.chunk_ptr));
return Status::OK();
}));
int64_t t2 = MonotonicMillis();
RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats, tracker,
&source_chunk_ptr));
int64_t t3 = MonotonicMillis();
uint64_t segment_file_size = 0;
uint64_t index_size = 0;
uint64_t footer_position = 0;
padding_char_columns(partial_schema, partial_tschema, source_chunk_ptr.get());
ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx));
RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*source_chunk_ptr));
RETURN_IF_ERROR(delta_column_group_writer->finalize(&segment_file_size, &index_size, &footer_position));
int64_t t4 = MonotonicMillis();
total_seek_source_segment_time += t2 - t1;
total_read_column_from_update_time += t3 - t2;
total_finalize_dcg_time += t4 - t3;
int64_t t3 = MonotonicMillis();
total_do_update_time += t2 - t1;
total_finalize_dcg_time += t3 - t2;
// 3.6 prepare column id list and dcg file list
dcg_column_ids[each.first].push_back(selective_unique_update_column_ids);
dcg_column_files[each.first].push_back(file_name(delta_column_group_writer->segment_path()));
Expand All @@ -759,9 +804,8 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
cost_str << " [insert missing rows] " << watch.elapsed_time();
watch.reset();
}
cost_str << strings::Substitute(
" seek_source_segment(ms):$0 read_column_from_update(ms):$1 avg_finalize_dcg_time(ms):$2 ",
total_seek_source_segment_time, total_read_column_from_update_time, total_finalize_dcg_time);
cost_str << strings::Substitute(" total_do_update_time(ms):$0 total_finalize_dcg_time(ms):$1 ",
total_do_update_time, total_finalize_dcg_time);
cost_str << strings::Substitute(
"rss_cnt:$0 update_cnt:$1 column_cnt:$2 update_rows:$3 handle_cnt:$4 insert_rows:$5",
rss_upt_id_to_rowid_pairs.size(), _partial_update_states.size(), update_column_ids.size(), update_rows,
luohaha marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
12 changes: 11 additions & 1 deletion be/src/storage/rowset_column_update_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ class Segment;
class RandomAccessFile;
class ColumnIterator;

struct StreamChunkContainer {
Chunk* chunk_ptr = nullptr;
// [start_rowid, end_rowid) is range of this chunk_ptr
uint32_t start_rowid = 0;
uint32_t end_rowid = 0;

// whether this container contains this rowid.
bool contains(uint32_t rowid) { return start_rowid <= rowid && rowid < end_rowid; }
};

struct RowsetSegmentStat {
int64_t num_rows_written = 0;
int64_t total_row_size = 0;
Expand Down Expand Up @@ -215,7 +225,7 @@ class RowsetColumnUpdateState {

Status _update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs, const Schema& partial_schema,
Rowset* rowset, OlapReaderStatistics* stats, MemTracker* tracker,
ChunkPtr* source_chunk);
StreamChunkContainer container);

private:
int64_t _tablet_id = 0;
luohaha marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
52 changes: 52 additions & 0 deletions be/test/storage/rowset_column_partial_update_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,58 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_column_ba
final_check(tablet, rowsets);
}

TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_source_chunk_limit) {
const int N = 100;
// generate M upt files in each partial rowset
const int M = 2;
auto tablet = create_tablet(rand(), rand());
ASSERT_EQ(1, tablet->updates()->version_history_count());

std::vector<int64_t> keys(2 * N);
std::vector<int64_t> partial_keys(N);
for (int i = 0; i < 2 * N; i++) {
keys[i] = i;
if (i % 2 == 0) {
partial_keys[i / 2] = i;
}
}
auto v1_func = [](int64_t k1) { return (int16_t)(k1 % 100 + 3); };
auto v2_func = [](int64_t k1) { return (int32_t)(k1 % 1000 + 4); };
std::vector<RowsetSharedPtr> rowsets;
rowsets.reserve(20);
// write full rowset first
for (int i = 0; i < 10; i++) {
rowsets.emplace_back(create_rowset(tablet, keys));
}
std::vector<std::shared_ptr<TabletSchema>> partial_schemas;
// partial update v1 and v2 one by one
for (int i = 0; i < 10; i++) {
std::vector<int32_t> column_indexes = {0, (i % 2) + 1};
partial_schemas.push_back(TabletSchema::create(tablet->tablet_schema(), column_indexes));
rowsets.emplace_back(create_partial_rowset(tablet, partial_keys, column_indexes, v1_func, v2_func,
partial_schemas[i], M, PartialUpdateMode::COLUMN_UPDATE_MODE, true));
ASSERT_EQ(rowsets.back()->num_update_files(), M);
}

int64_t version = 1;
int64_t old_vector_chunk_size = config::vector_chunk_size;
int64_t old_partial_update_memory_limit_per_worker = config::partial_update_memory_limit_per_worker;
config::vector_chunk_size = 10;
config::partial_update_memory_limit_per_worker = 0;
commit_rowsets(tablet, rowsets, version);
// check data
ASSERT_TRUE(check_tablet(tablet, version, 2 * N, [](int64_t k1, int64_t v1, int32_t v2) {
if (k1 % 2 == 0) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2;
} else {
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}
}));
config::vector_chunk_size = old_vector_chunk_size;
config::partial_update_memory_limit_per_worker = old_partial_update_memory_limit_per_worker;
final_check(tablet, rowsets);
}

INSTANTIATE_TEST_SUITE_P(RowsetColumnPartialUpdateTest, RowsetColumnPartialUpdateTest,
::testing::Values(1, 1024, 104857600));

Expand Down
Loading
Loading