diff --git a/be/src/common/config.h b/be/src/common/config.h index b5dbe79808818..499efe4af8580 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -321,6 +321,8 @@ CONF_mInt64(update_compaction_result_bytes, "1073741824"); CONF_mInt32(update_compaction_delvec_file_io_amp_ratio, "2"); // This config defines the maximum percentage of data allowed per compaction CONF_mDouble(update_compaction_ratio_threshold, "0.5"); +// This config controls max memory that we can use for partial update. +CONF_mInt64(partial_update_memory_limit_per_worker, "2147483648"); // 2GB CONF_mInt32(repair_compaction_interval_seconds, "600"); // 10 min CONF_Int32(manual_compaction_threads, "4"); diff --git a/be/src/storage/rowset/horizontal_update_rowset_writer.cpp b/be/src/storage/rowset/horizontal_update_rowset_writer.cpp index 9da336a3de4e9..525b28fabadd7 100644 --- a/be/src/storage/rowset/horizontal_update_rowset_writer.cpp +++ b/be/src/storage/rowset/horizontal_update_rowset_writer.cpp @@ -58,12 +58,15 @@ Status HorizontalUpdateRowsetWriter::add_chunk(const Chunk& chunk) { RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position)); { std::lock_guard l(_lock); - _num_rows_upt += _update_file_writer->num_rows_written(); _num_uptfile++; - _total_update_row_size += static_cast(chunk.bytes_usage()); } ASSIGN_OR_RETURN(_update_file_writer, _create_update_file_writer()); } + { + std::lock_guard l(_lock); + _num_rows_upt += chunk.num_rows(); + _total_update_row_size += static_cast(chunk.bytes_usage()); + } return _update_file_writer->append_chunk(chunk); } @@ -105,7 +108,6 @@ Status HorizontalUpdateRowsetWriter::flush() { RETURN_IF_ERROR(_update_file_writer->finalize(&segment_size, &index_size, &footer_position)); { std::lock_guard l(_lock); - _num_rows_upt += _update_file_writer->num_rows_written(); _num_uptfile++; } _update_file_writer.reset(); diff --git a/be/src/storage/rowset/rowset.h b/be/src/storage/rowset/rowset.h index bd8c46f3e245c..5d5cf694b0395 100644 --- a/be/src/storage/rowset/rowset.h +++ b/be/src/storage/rowset/rowset.h @@ -228,8 +228,9 @@ class Rowset : public std::enable_shared_from_this, public BaseRowset { size_t data_disk_size() const { return rowset_meta()->total_disk_size(); } bool empty() const { return rowset_meta()->empty(); } int64_t num_rows() const override { return rowset_meta()->num_rows(); } + int64_t num_rows_upt() const { return rowset_meta()->num_rows_upt(); } size_t total_row_size() const { return rowset_meta()->total_row_size(); } - size_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); } + int64_t total_update_row_size() const { return rowset_meta()->total_update_row_size(); } Version version() const { return rowset_meta()->version(); } RowsetId rowset_id() const override { return rowset_meta()->rowset_id(); } std::string rowset_id_str() const { return rowset_meta()->rowset_id().to_string(); } diff --git a/be/src/storage/rowset/rowset_meta.h b/be/src/storage/rowset/rowset_meta.h index 3db60cf78b120..635720809d856 100644 --- a/be/src/storage/rowset/rowset_meta.h +++ b/be/src/storage/rowset/rowset_meta.h @@ -98,6 +98,8 @@ class RowsetMeta { int64_t num_rows() const { return _rowset_meta_pb->num_rows(); } + int64_t num_rows_upt() const { return _rowset_meta_pb->num_rows_upt(); } + void set_num_rows(int64_t num_rows) { _rowset_meta_pb->set_num_rows(num_rows); } int64_t total_row_size() { return _rowset_meta_pb->total_row_size(); } diff --git a/be/src/storage/rowset/rowset_writer.cpp b/be/src/storage/rowset/rowset_writer.cpp index 4667be1db8736..e75d8c8d2f0b2 100644 --- a/be/src/storage/rowset/rowset_writer.cpp +++ b/be/src/storage/rowset/rowset_writer.cpp @@ -165,6 +165,7 @@ StatusOr RowsetWriter::build() { _rowset_meta_pb->set_num_delete_files(_num_delfile); _rowset_meta_pb->set_num_update_files(_num_uptfile); _rowset_meta_pb->set_total_update_row_size(_total_update_row_size); + _rowset_meta_pb->set_num_rows_upt(_num_rows_upt); if (_num_segment <= 1) { _rowset_meta_pb->set_segments_overlap_pb(NONOVERLAPPING); } diff --git a/be/src/storage/rowset_column_update_state.cpp b/be/src/storage/rowset_column_update_state.cpp index c5fa0e1595e56..0ce26499e3da8 100644 --- a/be/src/storage/rowset_column_update_state.cpp +++ b/be/src/storage/rowset_column_update_state.cpp @@ -300,11 +300,25 @@ Status RowsetColumnUpdateState::_finalize_partial_update_state(Tablet* tablet, R return Status::OK(); } -static StatusOr read_from_source_segment(Rowset* rowset, const Schema& schema, Tablet* tablet, - OlapReaderStatistics* stats, int64_t version, - RowsetSegmentId rowset_seg_id, const std::string& path) { +static int64_t calc_upt_memory_usage_per_row_column(Rowset* rowset) { + const auto& txn_meta = rowset->rowset_meta()->get_meta_pb_without_schema().txn_meta(); + const int64_t total_update_col_cnt = txn_meta.partial_update_column_ids_size(); + // `num_rows_upt` and `total_update_row_size` could be zero when upgrade from old version, + // then we will return zero and no limit. + if ((rowset->num_rows_upt() * total_update_col_cnt) <= 0) return 0; + return rowset->total_update_row_size() / (rowset->num_rows_upt() * total_update_col_cnt); +} + +// Read chunk from source segment file and call `update_func` to update it. +// `update_func` accept ChunkUniquePtr and [start_rowid, end_rowid) range of this chunk. +static Status read_from_source_segment_and_update(Rowset* rowset, const Schema& schema, Tablet* tablet, + OlapReaderStatistics* stats, int64_t version, + RowsetSegmentId rowset_seg_id, const std::string& path, + const std::function& update_func) { CHECK_MEM_LIMIT("RowsetColumnUpdateState::read_from_source_segment"); ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(rowset->rowset_path())); + // We need to estimate each update rows size before it has been actually updated. + const int64_t upt_memory_usage_per_row_column = calc_upt_memory_usage_per_row_column(rowset); auto segment = Segment::open(fs, FileInfo{path}, rowset_seg_id.segment_id, rowset->schema()); if (!segment.ok()) { LOG(WARNING) << "Fail to open " << path << ": " << segment.status(); @@ -322,11 +336,13 @@ static StatusOr read_from_source_segment(Rowset* rowset, const Schema& seg_options.version = version; // not use delvec loader seg_options.dcg_loader = std::make_shared(tablet->data_dir()->get_meta()); + seg_options.chunk_size = config::vector_chunk_size; ASSIGN_OR_RETURN(auto seg_iter, (*segment)->new_iterator(schema, seg_options)); ChunkUniquePtr source_chunk_ptr; ChunkUniquePtr tmp_chunk_ptr; - TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, (*segment)->num_rows())); - TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, 1024)); + TRY_CATCH_BAD_ALLOC(source_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size)); + TRY_CATCH_BAD_ALLOC(tmp_chunk_ptr = ChunkHelper::new_chunk(schema, config::vector_chunk_size)); + uint32_t start_rowid = 0; while (true) { tmp_chunk_ptr->reset(); auto st = seg_iter->get_next(tmp_chunk_ptr.get()); @@ -336,9 +352,30 @@ static StatusOr read_from_source_segment(Rowset* rowset, const Schema& return st; } else { source_chunk_ptr->append(*tmp_chunk_ptr); + // Avoid too many memory usage and Column overflow, we will limit source chunk's size. + if (source_chunk_ptr->num_rows() >= INT32_MAX || + (int64_t)source_chunk_ptr->num_rows() * upt_memory_usage_per_row_column * (int64_t)schema.num_fields() > + config::partial_update_memory_limit_per_worker) { + StreamChunkContainer container = { + .chunk_ptr = source_chunk_ptr.get(), + .start_rowid = start_rowid, + .end_rowid = start_rowid + static_cast(source_chunk_ptr->num_rows())}; + RETURN_IF_ERROR(update_func(container)); + start_rowid += static_cast(source_chunk_ptr->num_rows()); + source_chunk_ptr->reset(); + } } } - return source_chunk_ptr; + if (!source_chunk_ptr->is_empty()) { + StreamChunkContainer container = { + .chunk_ptr = source_chunk_ptr.get(), + .start_rowid = start_rowid, + .end_rowid = start_rowid + static_cast(source_chunk_ptr->num_rows())}; + RETURN_IF_ERROR(update_func(container)); + start_rowid += static_cast(source_chunk_ptr->num_rows()); + source_chunk_ptr->reset(); + } + return Status::OK(); } // this function build delta writer for delta column group's file.(end with `.col`) @@ -382,7 +419,8 @@ static Status read_chunk_from_update_file(const ChunkIteratorPtr& iter, const Ch // inorder_upt_rowids -> <2, 3, 4>, <5, 6> static void cut_rowids_in_order(const std::vector& rowid_pairs, std::vector>* inorder_source_rowids, - std::vector>* inorder_upt_rowids) { + std::vector>* inorder_upt_rowids, + StreamChunkContainer container) { uint32_t last_source_rowid = 0; std::vector current_source_rowids; std::vector current_upt_rowids; @@ -393,11 +431,16 @@ static void cut_rowids_in_order(const std::vector& rowid_pairs, inorder_upt_rowids->back().swap(current_upt_rowids); }; for (const auto& each : rowid_pairs) { + if (!container.contains(each.first)) { + // skip in this round + continue; + } if (each.first < last_source_rowid) { // cut cut_rowids_fn(); } - current_source_rowids.push_back(each.first); + // Align rowid + current_source_rowids.push_back(each.first - container.start_rowid); current_upt_rowids.push_back(each.second); last_source_rowid = each.first; } @@ -410,7 +453,7 @@ static void cut_rowids_in_order(const std::vector& rowid_pairs, Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs, const Schema& partial_schema, Rowset* rowset, OlapReaderStatistics* stats, MemTracker* tracker, - ChunkPtr* source_chunk) { + StreamChunkContainer container) { CHECK_MEM_LIMIT("RowsetColumnUpdateState::_update_source_chunk_by_upt"); // handle upt files one by one for (const auto& each : upt_id_to_rowid_pairs) { @@ -431,13 +474,13 @@ Status RowsetColumnUpdateState::_update_source_chunk_by_upt(const UptidToRowidPa // 2. update source chunk std::vector> inorder_source_rowids; std::vector> inorder_upt_rowids; - cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids); + cut_rowids_in_order(each.second, &inorder_source_rowids, &inorder_upt_rowids, container); DCHECK(inorder_source_rowids.size() == inorder_upt_rowids.size()); for (int i = 0; i < inorder_source_rowids.size(); i++) { auto tmp_chunk = ChunkHelper::new_chunk(partial_schema, inorder_upt_rowids[i].size()); TRY_CATCH_BAD_ALLOC(tmp_chunk->append_selective(*upt_chunk, inorder_upt_rowids[i].data(), 0, inorder_upt_rowids[i].size())); - RETURN_IF_EXCEPTION((*source_chunk)->update_rows(*tmp_chunk, inorder_source_rowids[i].data())); + RETURN_IF_EXCEPTION(container.chunk_ptr->update_rows(*tmp_chunk, inorder_source_rowids[i].data())); } } return Status::OK(); @@ -685,8 +728,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_ cost_str << " [generate delta column group writer] " << watch.elapsed_time(); watch.reset(); OlapReaderStatistics stats; - int64_t total_seek_source_segment_time = 0; - int64_t total_read_column_from_update_time = 0; + int64_t total_do_update_time = 0; int64_t total_finalize_dcg_time = 0; int64_t handle_cnt = 0; // must record unique column id in delta column group @@ -710,32 +752,35 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_ // 3.2 build partial schema auto partial_tschema = TabletSchema::create(tschema, selective_update_column_uids); Schema partial_schema = ChunkHelper::convert_schema(tschema, selective_update_column_ids); + ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx)); // 3.3 read from source segment ASSIGN_OR_RETURN(auto rowsetid_segid, _find_rowset_seg_id(each.first)); const std::string seg_path = Rowset::segment_file_path( rowset->rowset_path(), rowsetid_segid.unique_rowset_id, rowsetid_segid.segment_id); - ASSIGN_OR_RETURN(auto source_chunk_ptr, - read_from_source_segment(rowset, partial_schema, tablet, &stats, - latest_applied_version.major_number(), rowsetid_segid, seg_path)); - const size_t source_chunk_size = source_chunk_ptr->memory_usage(); - tracker->consume(source_chunk_size); - DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); }); - // 3.2 read from update segment + RETURN_IF_ERROR(read_from_source_segment_and_update( + rowset, partial_schema, tablet, &stats, latest_applied_version.major_number(), rowsetid_segid, + seg_path, [&](StreamChunkContainer container) { + VLOG(2) << "RowsetColumnUpdateState read from source segment: [byte usage: " + << container.chunk_ptr->bytes_usage() << " row cnt: " << container.chunk_ptr->num_rows() + << "] row range : [" << container.start_rowid << ", " << container.end_rowid << ")"; + const size_t source_chunk_size = container.chunk_ptr->memory_usage(); + tracker->consume(source_chunk_size); + DeferOp tracker_defer([&]() { tracker->release(source_chunk_size); }); + // 3.4 read from update segment and do update + RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats, + tracker, container)); + padding_char_columns(partial_schema, partial_tschema, container.chunk_ptr); + RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*container.chunk_ptr)); + return Status::OK(); + })); int64_t t2 = MonotonicMillis(); - RETURN_IF_ERROR(_update_source_chunk_by_upt(each.second, partial_schema, rowset, &stats, tracker, - &source_chunk_ptr)); - int64_t t3 = MonotonicMillis(); uint64_t segment_file_size = 0; uint64_t index_size = 0; uint64_t footer_position = 0; - padding_char_columns(partial_schema, partial_tschema, source_chunk_ptr.get()); - ASSIGN_OR_RETURN(auto delta_column_group_writer, build_writer_fn(each.first, partial_tschema, idx)); - RETURN_IF_ERROR(delta_column_group_writer->append_chunk(*source_chunk_ptr)); RETURN_IF_ERROR(delta_column_group_writer->finalize(&segment_file_size, &index_size, &footer_position)); - int64_t t4 = MonotonicMillis(); - total_seek_source_segment_time += t2 - t1; - total_read_column_from_update_time += t3 - t2; - total_finalize_dcg_time += t4 - t3; + int64_t t3 = MonotonicMillis(); + total_do_update_time += t2 - t1; + total_finalize_dcg_time += t3 - t2; // 3.6 prepare column id list and dcg file list dcg_column_ids[each.first].push_back(selective_unique_update_column_ids); dcg_column_files[each.first].push_back(file_name(delta_column_group_writer->segment_path())); @@ -759,9 +804,8 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_ cost_str << " [insert missing rows] " << watch.elapsed_time(); watch.reset(); } - cost_str << strings::Substitute( - " seek_source_segment(ms):$0 read_column_from_update(ms):$1 avg_finalize_dcg_time(ms):$2 ", - total_seek_source_segment_time, total_read_column_from_update_time, total_finalize_dcg_time); + cost_str << strings::Substitute(" total_do_update_time(ms):$0 total_finalize_dcg_time(ms):$1 ", + total_do_update_time, total_finalize_dcg_time); cost_str << strings::Substitute( "rss_cnt:$0 update_cnt:$1 column_cnt:$2 update_rows:$3 handle_cnt:$4 insert_rows:$5", rss_upt_id_to_rowid_pairs.size(), _partial_update_states.size(), update_column_ids.size(), update_rows, diff --git a/be/src/storage/rowset_column_update_state.h b/be/src/storage/rowset_column_update_state.h index f0e0295f3b43d..6bba549141019 100644 --- a/be/src/storage/rowset_column_update_state.h +++ b/be/src/storage/rowset_column_update_state.h @@ -31,6 +31,16 @@ class Segment; class RandomAccessFile; class ColumnIterator; +struct StreamChunkContainer { + Chunk* chunk_ptr = nullptr; + // [start_rowid, end_rowid) is range of this chunk_ptr + uint32_t start_rowid = 0; + uint32_t end_rowid = 0; + + // whether this container contains this rowid. + bool contains(uint32_t rowid) { return start_rowid <= rowid && rowid < end_rowid; } +}; + struct RowsetSegmentStat { int64_t num_rows_written = 0; int64_t total_row_size = 0; @@ -215,7 +225,7 @@ class RowsetColumnUpdateState { Status _update_source_chunk_by_upt(const UptidToRowidPairs& upt_id_to_rowid_pairs, const Schema& partial_schema, Rowset* rowset, OlapReaderStatistics* stats, MemTracker* tracker, - ChunkPtr* source_chunk); + StreamChunkContainer container); private: int64_t _tablet_id = 0; diff --git a/be/test/storage/rowset_column_partial_update_test.cpp b/be/test/storage/rowset_column_partial_update_test.cpp index c98de6dee4b15..2d0cc721cbc83 100644 --- a/be/test/storage/rowset_column_partial_update_test.cpp +++ b/be/test/storage/rowset_column_partial_update_test.cpp @@ -1172,6 +1172,58 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_multi_segment_and_column_ba final_check(tablet, rowsets); } +TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_source_chunk_limit) { + const int N = 100; + // generate M upt files in each partial rowset + const int M = 2; + auto tablet = create_tablet(rand(), rand()); + ASSERT_EQ(1, tablet->updates()->version_history_count()); + + std::vector keys(2 * N); + std::vector partial_keys(N); + for (int i = 0; i < 2 * N; i++) { + keys[i] = i; + if (i % 2 == 0) { + partial_keys[i / 2] = i; + } + } + auto v1_func = [](int64_t k1) { return (int16_t)(k1 % 100 + 3); }; + auto v2_func = [](int64_t k1) { return (int32_t)(k1 % 1000 + 4); }; + std::vector rowsets; + rowsets.reserve(20); + // write full rowset first + for (int i = 0; i < 10; i++) { + rowsets.emplace_back(create_rowset(tablet, keys)); + } + std::vector> partial_schemas; + // partial update v1 and v2 one by one + for (int i = 0; i < 10; i++) { + std::vector column_indexes = {0, (i % 2) + 1}; + partial_schemas.push_back(TabletSchema::create(tablet->tablet_schema(), column_indexes)); + rowsets.emplace_back(create_partial_rowset(tablet, partial_keys, column_indexes, v1_func, v2_func, + partial_schemas[i], M, PartialUpdateMode::COLUMN_UPDATE_MODE, true)); + ASSERT_EQ(rowsets.back()->num_update_files(), M); + } + + int64_t version = 1; + int64_t old_vector_chunk_size = config::vector_chunk_size; + int64_t old_partial_update_memory_limit_per_worker = config::partial_update_memory_limit_per_worker; + config::vector_chunk_size = 10; + config::partial_update_memory_limit_per_worker = 0; + commit_rowsets(tablet, rowsets, version); + // check data + ASSERT_TRUE(check_tablet(tablet, version, 2 * N, [](int64_t k1, int64_t v1, int32_t v2) { + if (k1 % 2 == 0) { + return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 4) == v2; + } else { + return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2; + } + })); + config::vector_chunk_size = old_vector_chunk_size; + config::partial_update_memory_limit_per_worker = old_partial_update_memory_limit_per_worker; + final_check(tablet, rowsets); +} + INSTANTIATE_TEST_SUITE_P(RowsetColumnPartialUpdateTest, RowsetColumnPartialUpdateTest, ::testing::Values(1, 1024, 104857600)); diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto index 326aef84ff7e0..d7dc4bc916625 100644 --- a/gensrc/proto/olap_file.proto +++ b/gensrc/proto/olap_file.proto @@ -175,6 +175,8 @@ message RowsetMetaPB { optional uint32 max_compact_input_rowset_id = 61; // global transaction id optional int64 gtid = 62; + // total number of upt file's rows. + optional int64 num_rows_upt = 63; } enum DataFileType {