From c4107f0d86478d7d5e89cb2f904043b8ea8f500b Mon Sep 17 00:00:00 2001 From: Alex Zhu Date: Mon, 18 Mar 2024 14:59:05 +0800 Subject: [PATCH] [Feature](3/n) Support fast schema evolution in shared data mode Before the fast schema evolution is implemented, the schema of a MaterializedIndex will not change, and the materialized index ID can be used as the unique identifier of its schema. In the upcoming fast schema evolution, the schema of a materialized index may change, and the materialized index id can no longer be used as the unique identifier of the schema. This PR changes the use of index id to schema id. Signed-off-by: Alex Zhu --- be/src/runtime/lake_tablets_channel.cpp | 4 +- be/src/storage/lake/async_delta_writer.cpp | 2 +- be/src/storage/lake/async_delta_writer.h | 6 +-- be/src/storage/lake/delta_writer.cpp | 24 ++++----- be/src/storage/lake/delta_writer.h | 16 +++--- be/src/storage/lake/schema_change.cpp | 2 +- be/src/storage/lake/tablet.cpp | 4 +- be/src/storage/lake/tablet.h | 2 +- be/src/storage/lake/tablet_manager.cpp | 8 +-- be/src/storage/lake/tablet_manager.h | 2 +- .../storage/lake/async_delta_writer_test.cpp | 22 ++++---- .../auto_increment_partial_update_test.cpp | 8 +-- be/test/storage/lake/compaction_task_test.cpp | 16 +++--- .../storage/lake/condition_update_test.cpp | 10 ++-- be/test/storage/lake/delta_writer_test.cpp | 30 +++++------ be/test/storage/lake/partial_update_test.cpp | 52 +++++++++---------- .../lake/primary_key_compaction_task_test.cpp | 30 +++++------ .../storage/lake/primary_key_publish_test.cpp | 40 +++++++------- be/test/storage/lake/schema_change_test.cpp | 26 +++++----- 19 files changed, 153 insertions(+), 151 deletions(-) diff --git a/be/src/runtime/lake_tablets_channel.cpp b/be/src/runtime/lake_tablets_channel.cpp index abf486f1be3dc..6bffec88c788d 100644 --- a/be/src/runtime/lake_tablets_channel.cpp +++ b/be/src/runtime/lake_tablets_channel.cpp @@ -482,10 +482,12 @@ void LakeTabletsChannel::_flush_stale_memtables() { } Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& params, bool is_incremental) { + int64_t schema_id = 0; std::vector* slots = nullptr; for (auto& index : _schema->indexes()) { if (index->index_id == _index_id) { slots = &index->slots; + schema_id = index->schema_id; break; } } @@ -531,7 +533,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& .set_table_id(params.table_id()) .set_immutable_tablet_size(params.immutable_tablet_size()) .set_mem_tracker(_mem_tracker) - .set_index_id(_index_id) + .set_schema_id(schema_id) .build()); _delta_writers.emplace(tablet.tablet_id(), std::move(writer)); tablet_ids.emplace_back(tablet.tablet_id()); diff --git a/be/src/storage/lake/async_delta_writer.cpp b/be/src/storage/lake/async_delta_writer.cpp index 640b57743d14a..9c265925b5b98 100644 --- a/be/src/storage/lake/async_delta_writer.cpp +++ b/be/src/storage/lake/async_delta_writer.cpp @@ -310,7 +310,7 @@ StatusOr AsyncDeltaWriterBuilder:: .set_mem_tracker(_mem_tracker) .set_immutable_tablet_size(_immutable_tablet_size) .set_miss_auto_increment_column(_miss_auto_increment_column) - .set_index_id(_index_id) + .set_schema_id(_schema_id) .build()); auto impl = new AsyncDeltaWriterImpl(std::move(writer)); return std::make_unique(impl); diff --git a/be/src/storage/lake/async_delta_writer.h b/be/src/storage/lake/async_delta_writer.h index b2e557993593f..09492b388adc1 100644 --- a/be/src/storage/lake/async_delta_writer.h +++ b/be/src/storage/lake/async_delta_writer.h @@ -169,8 +169,8 @@ class AsyncDeltaWriterBuilder { return *this; } - AsyncDeltaWriterBuilder& set_index_id(int64_t index_id) { - _index_id = index_id; + AsyncDeltaWriterBuilder& set_schema_id(int64_t schema_id) { + _schema_id = schema_id; return *this; } @@ -181,7 +181,7 @@ class AsyncDeltaWriterBuilder { int64_t _txn_id{0}; int64_t _table_id{0}; int64_t _partition_id{0}; - int64_t _index_id{0}; + int64_t _schema_id{0}; int64_t _tablet_id{0}; const std::vector* _slots{nullptr}; int64_t _immutable_tablet_size{0}; diff --git a/be/src/storage/lake/delta_writer.cpp b/be/src/storage/lake/delta_writer.cpp index c7c19b834b39b..84c113b53003b 100644 --- a/be/src/storage/lake/delta_writer.cpp +++ b/be/src/storage/lake/delta_writer.cpp @@ -76,13 +76,13 @@ class DeltaWriterImpl { explicit DeltaWriterImpl(TabletManager* tablet_manager, int64_t tablet_id, int64_t txn_id, int64_t partition_id, const std::vector* slots, std::string merge_condition, bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size, - MemTracker* mem_tracker, int64_t max_buffer_size, int64_t index_id) + MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id) : _tablet_manager(tablet_manager), _tablet_id(tablet_id), _txn_id(txn_id), _table_id(table_id), _partition_id(partition_id), - _index_id(index_id), + _schema_id(schema_id), _mem_tracker(mem_tracker), _slots(slots), _max_buffer_size(max_buffer_size > 0 ? max_buffer_size : config::write_buffer_size), @@ -94,11 +94,11 @@ class DeltaWriterImpl { DISALLOW_COPY_AND_MOVE(DeltaWriterImpl); - [[nodiscard]] Status open(); + Status open(); - [[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size); + Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size); - [[nodiscard]] Status finish(DeltaWriter::FinishMode mode); + Status finish(DeltaWriter::FinishMode mode); void close(); @@ -110,9 +110,9 @@ class DeltaWriterImpl { [[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; } - [[nodiscard]] Status flush(); + Status flush(); - [[nodiscard]] Status flush_async(); + Status flush_async(); int64_t queueing_memtable_num() const; @@ -146,7 +146,7 @@ class DeltaWriterImpl { const int64_t _txn_id; const int64_t _table_id; const int64_t _partition_id; - const int64_t _index_id; + const int64_t _schema_id; MemTracker* const _mem_tracker; const std::vector* const _slots; @@ -291,7 +291,7 @@ inline Status DeltaWriterImpl::init_tablet_schema() { return Status::OK(); } ASSIGN_OR_RETURN(auto tablet, _tablet_manager->get_tablet(_tablet_id)); - auto res = tablet.get_schema_by_index_id(_index_id); + auto res = tablet.get_schema_by_id(_schema_id); if (res.ok()) { _tablet_schema = std::move(res).value(); return Status::OK(); @@ -722,12 +722,12 @@ StatusOr DeltaWriterBuilder::build() { if (UNLIKELY(_miss_auto_increment_column && _table_id == 0)) { return Status::InvalidArgument("must set table_id when miss_auto_increment_column is true"); } - if (UNLIKELY(_index_id == 0)) { - return Status::InvalidArgument("index_id not set"); + if (UNLIKELY(_schema_id == 0)) { + return Status::InvalidArgument("schema_id not set"); } auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition, _miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker, - _max_buffer_size, _index_id); + _max_buffer_size, _schema_id); return std::make_unique(impl); } diff --git a/be/src/storage/lake/delta_writer.h b/be/src/storage/lake/delta_writer.h index 444c6769c345f..f924d04397350 100644 --- a/be/src/storage/lake/delta_writer.h +++ b/be/src/storage/lake/delta_writer.h @@ -54,21 +54,21 @@ class DeltaWriter { DISALLOW_COPY_AND_MOVE(DeltaWriter); // NOTE: It's ok to invoke this method in a bthread, there is no I/O operation in this method. - [[nodiscard]] Status open(); + Status open(); // NOTE: Do NOT invoke this method in a bthread. - [[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size); + Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size); // NOTE: Do NOT invoke this method in a bthread. - [[nodiscard]] Status finish(FinishMode mode = kWriteTxnLog); + Status finish(FinishMode mode = kWriteTxnLog); // Manual flush, mainly used in UT // NOTE: Do NOT invoke this method in a bthread. - [[nodiscard]] Status flush(); + Status flush(); // Manual flush, mainly used in UT // NOTE: Do NOT invoke this method in a bthread. - [[nodiscard]] Status flush_async(); + Status flush_async(); // NOTE: Do NOT invoke this method in a bthread unless you are sure that `write()` has never been called. void close(); @@ -169,8 +169,8 @@ class DeltaWriterBuilder { return *this; } - DeltaWriterBuilder& set_index_id(int64_t index_id) { - _index_id = index_id; + DeltaWriterBuilder& set_schema_id(int64_t schema_id) { + _schema_id = schema_id; return *this; } @@ -181,7 +181,7 @@ class DeltaWriterBuilder { int64_t _txn_id{0}; int64_t _table_id{0}; int64_t _partition_id{0}; - int64_t _index_id{0}; + int64_t _schema_id{0}; int64_t _tablet_id{0}; const std::vector* _slots{nullptr}; std::string _merge_condition{}; diff --git a/be/src/storage/lake/schema_change.cpp b/be/src/storage/lake/schema_change.cpp index 6ad0779a93a25..4fb678402f8ce 100644 --- a/be/src/storage/lake/schema_change.cpp +++ b/be/src/storage/lake/schema_change.cpp @@ -245,7 +245,7 @@ Status SortedSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_ .set_txn_id(_txn_id) .set_max_buffer_size(_max_buffer_size) .set_mem_tracker(CurrentThread::mem_tracker()) - .set_index_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly + .set_schema_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly .build()); RETURN_IF_ERROR(writer->open()); DeferOp defer([&]() { writer->close(); }); diff --git a/be/src/storage/lake/tablet.cpp b/be/src/storage/lake/tablet.cpp index 83668e91dfcde..4369397c95856 100644 --- a/be/src/storage/lake/tablet.cpp +++ b/be/src/storage/lake/tablet.cpp @@ -96,8 +96,8 @@ StatusOr> Tablet::get_schema() { return _mgr->get_tablet_schema(_id, &_version_hint); } -StatusOr> Tablet::get_schema_by_index_id(int64_t index_id) { - return _mgr->get_tablet_schema_by_index_id(_id, index_id); +StatusOr> Tablet::get_schema_by_id(int64_t index_id) { + return _mgr->get_tablet_schema_by_id(_id, index_id); } StatusOr> Tablet::get_rowsets(int64_t version) { diff --git a/be/src/storage/lake/tablet.h b/be/src/storage/lake/tablet.h index 2849d4fddc6d5..096a66e12f57a 100644 --- a/be/src/storage/lake/tablet.h +++ b/be/src/storage/lake/tablet.h @@ -92,7 +92,7 @@ class Tablet { // NOTE: This method may update the version hint StatusOr> get_schema(); - StatusOr> get_schema_by_index_id(int64_t index_id); + StatusOr> get_schema_by_id(int64_t index_id); StatusOr> get_rowsets(int64_t version); diff --git a/be/src/storage/lake/tablet_manager.cpp b/be/src/storage/lake/tablet_manager.cpp index 14f7b55bcde9f..398519d5465bb 100644 --- a/be/src/storage/lake/tablet_manager.cpp +++ b/be/src/storage/lake/tablet_manager.cpp @@ -421,7 +421,7 @@ StatusOr TabletManager::get_tablet_schema(int64_t tablet_id, in auto index_id_iter = properties.find("indexId"); if (index_id_iter != properties.end()) { auto index_id = std::atol(index_id_iter->second.data()); - auto res = get_tablet_schema_by_index_id(tablet_id, index_id); + auto res = get_tablet_schema_by_id(tablet_id, index_id); if (res.ok()) { return res; } else if (res.status().is_not_found()) { @@ -468,17 +468,17 @@ StatusOr TabletManager::get_tablet_schema(int64_t tablet_id, in return schema; } -StatusOr TabletManager::get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id) { +StatusOr TabletManager::get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id) { auto global_cache_key = global_schema_cache_key(index_id); auto schema = _metacache->lookup_tablet_schema(global_cache_key); - TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.1", &schema); + TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.1", &schema); if (schema != nullptr) { return schema; } // else: Cache miss, read the schema file auto schema_file_path = join_path(tablet_root_location(tablet_id), schema_filename(index_id)); auto schema_or = load_and_parse_schema_file(schema_file_path); - TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.2", &schema_or); + TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.2", &schema_or); if (schema_or.ok()) { VLOG(3) << "Got tablet schema of id " << index_id << " for tablet " << tablet_id; schema = std::move(schema_or).value(); diff --git a/be/src/storage/lake/tablet_manager.h b/be/src/storage/lake/tablet_manager.h index c57839700c821..cb8438ce25270 100644 --- a/be/src/storage/lake/tablet_manager.h +++ b/be/src/storage/lake/tablet_manager.h @@ -177,7 +177,7 @@ class TabletManager { Status create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb); StatusOr load_and_parse_schema_file(const std::string& path); StatusOr get_tablet_schema(int64_t tablet_id, int64_t* version_hint = nullptr); - StatusOr get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id); + StatusOr get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id); StatusOr load_tablet_metadata(const std::string& metadata_location, bool fill_cache); StatusOr load_txn_log(const std::string& txn_log_location, bool fill_cache); diff --git a/be/test/storage/lake/async_delta_writer_test.cpp b/be/test/storage/lake/async_delta_writer_test.cpp index ef13e2819c026..add058770cf2e 100644 --- a/be/test/storage/lake/async_delta_writer_test.cpp +++ b/be/test/storage/lake/async_delta_writer_test.cpp @@ -97,7 +97,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); delta_writer->close(); @@ -112,7 +112,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->open()); @@ -129,7 +129,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); auto t1 = std::thread([&]() { for (int i = 0; i < 10000; i++) { @@ -165,7 +165,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); // Call open() again @@ -249,7 +249,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_concurrently) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -351,7 +351,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_after_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -387,7 +387,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_finish_after_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -413,7 +413,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -430,7 +430,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); auto t1 = std::thread([&]() { @@ -457,7 +457,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open_after_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); delta_writer->close(); @@ -488,7 +488,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_concurrent_write_and_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); diff --git a/be/test/storage/lake/auto_increment_partial_update_test.cpp b/be/test/storage/lake/auto_increment_partial_update_test.cpp index 3fea6bc19d85f..7bceb3a7e17b4 100644 --- a/be/test/storage/lake/auto_increment_partial_update_test.cpp +++ b/be/test/storage/lake/auto_increment_partial_update_test.cpp @@ -199,7 +199,7 @@ TEST_F(LakeAutoIncrementPartialUpdateTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -229,7 +229,7 @@ TEST_F(LakeAutoIncrementPartialUpdateTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .set_miss_auto_increment_column(true) .set_table_id(next_id()) @@ -269,7 +269,7 @@ TEST_F(LakeAutoIncrementPartialUpdateTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -301,7 +301,7 @@ TEST_F(LakeAutoIncrementPartialUpdateTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .set_miss_auto_increment_column(true) .set_table_id(next_id()) diff --git a/be/test/storage/lake/compaction_task_test.cpp b/be/test/storage/lake/compaction_task_test.cpp index 644af12acefba..13d58808e7ca6 100644 --- a/be/test/storage/lake/compaction_task_test.cpp +++ b/be/test/storage/lake/compaction_task_test.cpp @@ -115,7 +115,7 @@ class LakeDuplicateKeyCompactionTest : public LakeCompactionTest { std::shared_ptr _tablet_metadata; std::shared_ptr _tablet_schema; std::shared_ptr _schema; - int64_t _partition_id = 4560; + int64_t _partition_id = next_id(); }; TEST_P(LakeDuplicateKeyCompactionTest, test1) { @@ -137,7 +137,7 @@ TEST_P(LakeDuplicateKeyCompactionTest, test1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -244,7 +244,7 @@ class LakeDuplicateKeyOverlapSegmentsCompactionTest : public LakeCompactionTest std::shared_ptr _tablet_metadata; std::shared_ptr _tablet_schema; std::shared_ptr _schema; - int64_t _partition_id = 4563; + int64_t _partition_id = next_id(); }; TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) { @@ -266,7 +266,7 @@ TEST_P(LakeDuplicateKeyOverlapSegmentsCompactionTest, test) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); for (int j = 0; j < i + 1; ++j) { @@ -394,7 +394,7 @@ class LakeUniqueKeyCompactionTest : public LakeCompactionTest { std::shared_ptr _tablet_metadata; std::shared_ptr _tablet_schema; std::shared_ptr _schema; - int64_t _partition_id = 4561; + int64_t _partition_id = next_id(); }; TEST_P(LakeUniqueKeyCompactionTest, test1) { @@ -416,7 +416,7 @@ TEST_P(LakeUniqueKeyCompactionTest, test1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -508,7 +508,7 @@ class LakeUniqueKeyCompactionWithDeleteTest : public LakeCompactionTest { std::shared_ptr _tablet_metadata; std::shared_ptr _tablet_schema; std::shared_ptr _schema; - int64_t _partition_id = 4562; + int64_t _partition_id = next_id(); }; TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete) { @@ -530,7 +530,7 @@ TEST_P(LakeUniqueKeyCompactionWithDeleteTest, test_base_compaction_with_delete) .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); diff --git a/be/test/storage/lake/condition_update_test.cpp b/be/test/storage/lake/condition_update_test.cpp index f1223a0fc8259..46be42a90d525 100644 --- a/be/test/storage/lake/condition_update_test.cpp +++ b/be/test/storage/lake/condition_update_test.cpp @@ -155,7 +155,7 @@ TEST_P(ConditionUpdateTest, test_condition_update) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -189,7 +189,7 @@ TEST_P(ConditionUpdateTest, test_condition_update) { .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) .set_merge_condition("c1") - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); @@ -227,7 +227,7 @@ TEST_P(ConditionUpdateTest, test_condition_update_multi_segment) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -255,7 +255,7 @@ TEST_P(ConditionUpdateTest, test_condition_update_multi_segment) { .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) .set_merge_condition("c1") - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(i == 0 ? chunk1 : chunk2, indexes.data(), indexes.size())); @@ -295,7 +295,7 @@ TEST_P(ConditionUpdateTest, test_condition_update_in_memtable) { .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) .set_merge_condition("c1") - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); // finish condition merge in one memtable diff --git a/be/test/storage/lake/delta_writer_test.cpp b/be/test/storage/lake/delta_writer_test.cpp index 5ca0b7a7a28b9..fbefa61dcbf1f 100644 --- a/be/test/storage/lake/delta_writer_test.cpp +++ b/be/test/storage/lake/delta_writer_test.cpp @@ -138,7 +138,7 @@ TEST_F(LakeDeltaWriterTest, test_build) { .set_table_id(8) .build(); ASSERT_TRUE(!res.ok()); - ASSERT_EQ("index_id not set", res.status().message()); + ASSERT_EQ("schema_id not set", res.status().message()); } } @@ -153,7 +153,7 @@ TEST_F(LakeDeltaWriterTest, test_open) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); delta_writer->close(); @@ -178,7 +178,7 @@ TEST_F(LakeDeltaWriterTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -251,9 +251,9 @@ TEST_F(LakeDeltaWriterTest, test_write_without_schema_file) { SyncPoint::GetInstance()->EnableProcessing(); - SyncPoint::GetInstance()->SetCallBack("get_tablet_schema_by_index_id.1", + SyncPoint::GetInstance()->SetCallBack("get_tablet_schema_by_id.1", [](void* arg) { ((std::shared_ptr*)arg)->reset(); }); - SyncPoint::GetInstance()->SetCallBack("get_tablet_schema_by_index_id.2", [&](void* arg) { + SyncPoint::GetInstance()->SetCallBack("get_tablet_schema_by_id.2", [&](void* arg) { *((StatusOr>*)arg) = Status::NotFound("mocked not found error"); invoked = true; }); @@ -267,7 +267,7 @@ TEST_F(LakeDeltaWriterTest, test_write_without_schema_file) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -279,7 +279,7 @@ TEST_F(LakeDeltaWriterTest, test_write_without_schema_file) { // close delta_writer->close(); - ASSERT_TRUE(invoked) << "get_tablet_schema_by_index_id not invoked"; + ASSERT_TRUE(invoked) << "get_tablet_schema_by_id not invoked"; SyncPoint::GetInstance()->ClearAllCallBacks(); SyncPoint::GetInstance()->DisableProcessing(); } @@ -302,7 +302,7 @@ TEST_F(LakeDeltaWriterTest, test_close) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -341,7 +341,7 @@ TEST_F(LakeDeltaWriterTest, test_finish_without_write_txn_log) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -373,7 +373,7 @@ TEST_F(LakeDeltaWriterTest, test_empty_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->finish()); @@ -402,7 +402,7 @@ TEST_F(LakeDeltaWriterTest, test_negative_txn_id) { .set_txn_id(-1) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_ERROR(delta_writer->finish()); @@ -427,7 +427,7 @@ TEST_F(LakeDeltaWriterTest, test_memory_limit_unreached) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -477,7 +477,7 @@ TEST_F(LakeDeltaWriterTest, test_reached_memory_limit) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -528,7 +528,7 @@ TEST_F(LakeDeltaWriterTest, test_reached_parent_memory_limit) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); @@ -580,7 +580,7 @@ TEST_F(LakeDeltaWriterTest, test_memtable_full) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); diff --git a/be/test/storage/lake/partial_update_test.cpp b/be/test/storage/lake/partial_update_test.cpp index 75cfefe42458a..4392d3ad26bb8 100644 --- a/be/test/storage/lake/partial_update_test.cpp +++ b/be/test/storage/lake/partial_update_test.cpp @@ -203,7 +203,7 @@ TEST_P(LakePartialUpdateTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -226,7 +226,7 @@ TEST_P(LakePartialUpdateTest, test_write) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -266,7 +266,7 @@ TEST_P(LakePartialUpdateTest, test_write_multi_segment) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -291,7 +291,7 @@ TEST_P(LakePartialUpdateTest, test_write_multi_segment) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -336,7 +336,7 @@ TEST_P(LakePartialUpdateTest, test_write_multi_segment_by_diff_val) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -361,7 +361,7 @@ TEST_P(LakePartialUpdateTest, test_write_multi_segment_by_diff_val) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -405,7 +405,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -430,7 +430,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -474,7 +474,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict_multi_segment) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -501,7 +501,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict_multi_segment) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -546,7 +546,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -579,7 +579,7 @@ TEST_P(LakePartialUpdateTest, test_resolve_conflict2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -619,7 +619,7 @@ TEST_P(LakePartialUpdateTest, test_write_with_index_reload) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -645,7 +645,7 @@ TEST_P(LakePartialUpdateTest, test_write_with_index_reload) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -686,7 +686,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_publish_retry) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -709,7 +709,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_publish_retry) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -749,7 +749,7 @@ TEST_P(LakePartialUpdateTest, test_concurrent_write_publish) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -769,7 +769,7 @@ TEST_P(LakePartialUpdateTest, test_concurrent_write_publish) { .set_txn_id(txn_id1) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -795,7 +795,7 @@ TEST_P(LakePartialUpdateTest, test_concurrent_write_publish) { .set_txn_id(txn_id2) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -829,7 +829,7 @@ TEST_P(LakePartialUpdateTest, test_batch_publish) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -853,7 +853,7 @@ TEST_P(LakePartialUpdateTest, test_batch_publish) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -1007,7 +1007,7 @@ TEST_F(LakeIncompleteSortKeyPartialUpdateTest, test_incomplete_sort_key) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -1034,7 +1034,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_retry_rewrite_check) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1057,7 +1057,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_retry_rewrite_check) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -1116,7 +1116,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_retry_check_file_exist) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1139,7 +1139,7 @@ TEST_P(LakePartialUpdateTest, test_partial_update_retry_check_file_exist) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); diff --git a/be/test/storage/lake/primary_key_compaction_task_test.cpp b/be/test/storage/lake/primary_key_compaction_task_test.cpp index 3cbec9a0ef920..3c37d30f3429a 100644 --- a/be/test/storage/lake/primary_key_compaction_task_test.cpp +++ b/be/test/storage/lake/primary_key_compaction_task_test.cpp @@ -185,7 +185,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -255,7 +255,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); @@ -316,7 +316,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test3) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); if (i == 1) { @@ -375,7 +375,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); @@ -424,7 +424,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes_list[i].data(), indexes_list[i].size())); @@ -442,7 +442,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[0], indexes_list[0].data(), indexes_list[0].size())); @@ -494,7 +494,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy3) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); for (int seg_cnt = 0; seg_cnt <= i; seg_cnt++) { @@ -516,7 +516,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_policy3) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[0], indexes_list[0].data(), indexes_list[0].size())); @@ -568,7 +568,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_score_by_policy) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); @@ -622,7 +622,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_compaction_sorted) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[chunk_write_without_order[i]], indexes.data(), indexes.size())); @@ -697,7 +697,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_remove_compaction_state) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -742,7 +742,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_remove_compaction_state) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -777,7 +777,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_abort_txn) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -847,7 +847,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_multi_output_seg) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); @@ -909,7 +909,7 @@ TEST_P(LakePrimaryKeyCompactionTest, test_pk_recover_rowset_order_after_compact) .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunks[i], indexes.data(), indexes.size())); diff --git a/be/test/storage/lake/primary_key_publish_test.cpp b/be/test/storage/lake/primary_key_publish_test.cpp index 55e3f915336fa..c36485243c158 100644 --- a/be/test/storage/lake/primary_key_publish_test.cpp +++ b/be/test/storage/lake/primary_key_publish_test.cpp @@ -218,7 +218,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_multitime_check_result) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -261,7 +261,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_fail_retry) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunks[i], indexes.data(), indexes.size())); @@ -290,7 +290,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_fail_retry) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunks[i], indexes.data(), indexes.size())); @@ -309,7 +309,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_fail_retry) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunks[i], indexes.data(), indexes.size())); @@ -346,7 +346,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_publish_multi_times) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -388,7 +388,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_publish_concurrent) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -427,7 +427,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -451,7 +451,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_resolve_conflict) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -505,7 +505,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_read_success_multiple_tablet) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(w->open()); ASSERT_OK(w->write(*chunk_ptr, indexes, indexes_size)); @@ -544,7 +544,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_largedata) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -580,7 +580,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_recover) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -629,7 +629,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -673,7 +673,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_abort_txn) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(*chunk0, indexes.data(), indexes.size())); @@ -711,7 +711,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -727,7 +727,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -772,7 +772,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish_1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -788,7 +788,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish_1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -812,7 +812,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_batch_publish_1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -844,7 +844,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_transform_batch_to_single) { .set_txn_id(txn_id1) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); @@ -860,7 +860,7 @@ TEST_P(LakePrimaryKeyPublishTest, test_transform_batch_to_single) { .set_txn_id(txn_id2) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_tablet_schema->id()) + .set_schema_id(_tablet_schema->id()) .set_slot_descriptors(&_slot_pointers) .build()); ASSERT_OK(delta_writer->open()); diff --git a/be/test/storage/lake/schema_change_test.cpp b/be/test/storage/lake/schema_change_test.cpp index 948d616b32221..9d13db871f0d1 100644 --- a/be/test/storage/lake/schema_change_test.cpp +++ b/be/test/storage/lake/schema_change_test.cpp @@ -250,7 +250,7 @@ TEST_P(SchemaChangeAddColumnTest, test_add_column) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes, sizeof(indexes) / sizeof(indexes[0]))); @@ -295,7 +295,7 @@ TEST_P(SchemaChangeAddColumnTest, test_add_column) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes, sizeof(indexes) / sizeof(indexes[0]))); @@ -505,7 +505,7 @@ TEST_P(SchemaChangeModifyColumnTypeTest, test_alter_column_type) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes, sizeof(indexes) / sizeof(indexes[0]))); @@ -549,7 +549,7 @@ TEST_P(SchemaChangeModifyColumnTypeTest, test_alter_column_type) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes, sizeof(indexes) / sizeof(indexes[0]))); @@ -784,7 +784,7 @@ TEST_P(SchemaChangeModifyColumnOrderTest, test_alter_key_order) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -818,7 +818,7 @@ TEST_P(SchemaChangeModifyColumnOrderTest, test_alter_key_order) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size())); @@ -1058,7 +1058,7 @@ TEST_P(SchemaChangeModifyColumnMultiSegmentOrderTest, test_alter_table) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1271,7 +1271,7 @@ TEST_P(SchemaChangeSortKeyReorderTest1, test_alter_sortkey_reorder_1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1305,7 +1305,7 @@ TEST_P(SchemaChangeSortKeyReorderTest1, test_alter_sortkey_reorder_1) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size())); @@ -1509,7 +1509,7 @@ TEST_P(SchemaChangeSortKeyReorderTest2, test_alter_sortkey_reorder2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1543,7 +1543,7 @@ TEST_P(SchemaChangeSortKeyReorderTest2, test_alter_sortkey_reorder2) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size())); @@ -1745,7 +1745,7 @@ TEST_P(SchemaChangeSortKeyReorderTest3, test_alter_sortkey_reorder3) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_base_tablet_schema->id()) + .set_schema_id(_base_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size())); @@ -1779,7 +1779,7 @@ TEST_P(SchemaChangeSortKeyReorderTest3, test_alter_sortkey_reorder3) { .set_txn_id(txn_id) .set_partition_id(_partition_id) .set_mem_tracker(_mem_tracker.get()) - .set_index_id(_new_tablet_schema->id()) + .set_schema_id(_new_tablet_schema->id()) .build()); ASSERT_OK(delta_writer->open()); ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));