Skip to content

Commit

Permalink
[Feature](3/n) Support fast schema evolution in shared data mode (#42737
Browse files Browse the repository at this point in the history
)

Signed-off-by: Alex Zhu <[email protected]>
(cherry picked from commit 3c7f9db)

# Conflicts:
#	be/test/storage/lake/delta_writer_test.cpp
#	be/test/storage/lake/partial_update_test.cpp
  • Loading branch information
sduzh authored and mergify[bot] committed Jul 29, 2024
1 parent 62ecb79 commit 1f6e4e5
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 148 deletions.
4 changes: 3 additions & 1 deletion be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,10 +492,12 @@ void LakeTabletsChannel::_flush_stale_memtables() {
}

Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& params, bool is_incremental) {
int64_t schema_id = 0;
std::vector<SlotDescriptor*>* slots = nullptr;
for (auto& index : _schema->indexes()) {
if (index->index_id == _index_id) {
slots = &index->slots;
schema_id = index->schema_id;
break;
}
}
Expand Down Expand Up @@ -541,7 +543,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_table_id(params.table_id())
.set_immutable_tablet_size(params.immutable_tablet_size())
.set_mem_tracker(_mem_tracker)
.set_index_id(_index_id)
.set_schema_id(schema_id)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ StatusOr<AsyncDeltaWriterBuilder::AsyncDeltaWriterPtr> AsyncDeltaWriterBuilder::
.set_mem_tracker(_mem_tracker)
.set_immutable_tablet_size(_immutable_tablet_size)
.set_miss_auto_increment_column(_miss_auto_increment_column)
.set_index_id(_index_id)
.set_schema_id(_schema_id)
.build());
auto impl = new AsyncDeltaWriterImpl(std::move(writer));
return std::make_unique<AsyncDeltaWriter>(impl);
Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/lake/async_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ class AsyncDeltaWriterBuilder {
return *this;
}

AsyncDeltaWriterBuilder& set_index_id(int64_t index_id) {
_index_id = index_id;
AsyncDeltaWriterBuilder& set_schema_id(int64_t schema_id) {
_schema_id = schema_id;
return *this;
}

Expand All @@ -181,7 +181,7 @@ class AsyncDeltaWriterBuilder {
int64_t _txn_id{0};
int64_t _table_id{0};
int64_t _partition_id{0};
int64_t _index_id{0};
int64_t _schema_id{0};
int64_t _tablet_id{0};
const std::vector<SlotDescriptor*>* _slots{nullptr};
int64_t _immutable_tablet_size{0};
Expand Down
24 changes: 12 additions & 12 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ class DeltaWriterImpl {
explicit DeltaWriterImpl(TabletManager* tablet_manager, int64_t tablet_id, int64_t txn_id, int64_t partition_id,
const std::vector<SlotDescriptor*>* slots, std::string merge_condition,
bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size,
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t index_id)
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id)
: _tablet_manager(tablet_manager),
_tablet_id(tablet_id),
_txn_id(txn_id),
_table_id(table_id),
_partition_id(partition_id),
_index_id(index_id),
_schema_id(schema_id),
_mem_tracker(mem_tracker),
_slots(slots),
_max_buffer_size(max_buffer_size > 0 ? max_buffer_size : config::write_buffer_size),
Expand All @@ -94,11 +94,11 @@ class DeltaWriterImpl {

DISALLOW_COPY_AND_MOVE(DeltaWriterImpl);

[[nodiscard]] Status open();
Status open();

[[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);
Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);

[[nodiscard]] Status finish(DeltaWriter::FinishMode mode);
Status finish(DeltaWriter::FinishMode mode);

void close();

Expand All @@ -110,9 +110,9 @@ class DeltaWriterImpl {

[[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; }

[[nodiscard]] Status flush();
Status flush();

[[nodiscard]] Status flush_async();
Status flush_async();

int64_t queueing_memtable_num() const;

Expand Down Expand Up @@ -146,7 +146,7 @@ class DeltaWriterImpl {
const int64_t _txn_id;
const int64_t _table_id;
const int64_t _partition_id;
const int64_t _index_id;
const int64_t _schema_id;
MemTracker* const _mem_tracker;

const std::vector<SlotDescriptor*>* const _slots;
Expand Down Expand Up @@ -282,7 +282,7 @@ inline Status DeltaWriterImpl::init_tablet_schema() {
return Status::OK();
}
ASSIGN_OR_RETURN(auto tablet, _tablet_manager->get_tablet(_tablet_id));
auto res = tablet.get_schema_by_index_id(_index_id);
auto res = tablet.get_schema_by_id(_schema_id);
if (res.ok()) {
_tablet_schema = std::move(res).value();
return Status::OK();
Expand Down Expand Up @@ -714,12 +714,12 @@ StatusOr<DeltaWriterBuilder::DeltaWriterPtr> DeltaWriterBuilder::build() {
if (UNLIKELY(_miss_auto_increment_column && _table_id == 0)) {
return Status::InvalidArgument("must set table_id when miss_auto_increment_column is true");
}
if (UNLIKELY(_index_id == 0)) {
return Status::InvalidArgument("index_id not set");
if (UNLIKELY(_schema_id == 0)) {
return Status::InvalidArgument("schema_id not set");
}
auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition,
_miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker,
_max_buffer_size, _index_id);
_max_buffer_size, _schema_id);
return std::make_unique<DeltaWriter>(impl);
}

Expand Down
16 changes: 8 additions & 8 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ class DeltaWriter {
DISALLOW_COPY_AND_MOVE(DeltaWriter);

// NOTE: It's ok to invoke this method in a bthread, there is no I/O operation in this method.
[[nodiscard]] Status open();
Status open();

// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);
Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);

// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status finish(FinishMode mode = kWriteTxnLog);
Status finish(FinishMode mode = kWriteTxnLog);

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status flush();
Status flush();

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status flush_async();
Status flush_async();

// NOTE: Do NOT invoke this method in a bthread unless you are sure that `write()` has never been called.
void close();
Expand Down Expand Up @@ -169,8 +169,8 @@ class DeltaWriterBuilder {
return *this;
}

DeltaWriterBuilder& set_index_id(int64_t index_id) {
_index_id = index_id;
DeltaWriterBuilder& set_schema_id(int64_t schema_id) {
_schema_id = schema_id;
return *this;
}

Expand All @@ -181,7 +181,7 @@ class DeltaWriterBuilder {
int64_t _txn_id{0};
int64_t _table_id{0};
int64_t _partition_id{0};
int64_t _index_id{0};
int64_t _schema_id{0};
int64_t _tablet_id{0};
const std::vector<SlotDescriptor*>* _slots{nullptr};
std::string _merge_condition{};
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ Status SortedSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_
.set_txn_id(_txn_id)
.set_max_buffer_size(_max_buffer_size)
.set_mem_tracker(CurrentThread::mem_tracker())
.set_index_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly
.set_schema_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly
.build());
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema() {
return _mgr->get_tablet_schema(_id, &_version_hint);
}

StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema_by_index_id(int64_t index_id) {
return _mgr->get_tablet_schema_by_index_id(_id, index_id);
StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema_by_id(int64_t index_id) {
return _mgr->get_tablet_schema_by_id(_id, index_id);
}

StatusOr<std::vector<RowsetPtr>> Tablet::get_rowsets(int64_t version) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Tablet {
// NOTE: This method may update the version hint
StatusOr<std::shared_ptr<const TabletSchema>> get_schema();

StatusOr<std::shared_ptr<const TabletSchema>> get_schema_by_index_id(int64_t index_id);
StatusOr<std::shared_ptr<const TabletSchema>> get_schema_by_id(int64_t index_id);

StatusOr<std::vector<RowsetPtr>> get_rowsets(int64_t version);

Expand Down
8 changes: 4 additions & 4 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema(int64_t tablet_id, in
auto index_id_iter = properties.find("indexId");
if (index_id_iter != properties.end()) {
auto index_id = std::atol(index_id_iter->second.data());
auto res = get_tablet_schema_by_index_id(tablet_id, index_id);
auto res = get_tablet_schema_by_id(tablet_id, index_id);
if (res.ok()) {
return res;
} else if (res.status().is_not_found()) {
Expand Down Expand Up @@ -469,17 +469,17 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema(int64_t tablet_id, in
return schema;
}

StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id) {
StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id) {
auto global_cache_key = global_schema_cache_key(index_id);
auto schema = _metacache->lookup_tablet_schema(global_cache_key);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.1", &schema);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.1", &schema);
if (schema != nullptr) {
return schema;
}
// else: Cache miss, read the schema file
auto schema_file_path = join_path(tablet_root_location(tablet_id), schema_filename(index_id));
auto schema_or = load_and_parse_schema_file(schema_file_path);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.2", &schema_or);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.2", &schema_or);
if (schema_or.ok()) {
VLOG(3) << "Got tablet schema of id " << index_id << " for tablet " << tablet_id;
schema = std::move(schema_or).value();
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TabletManager {
Status create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb);
StatusOr<TabletSchemaPtr> load_and_parse_schema_file(const std::string& path);
StatusOr<TabletSchemaPtr> get_tablet_schema(int64_t tablet_id, int64_t* version_hint = nullptr);
StatusOr<TabletSchemaPtr> get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id);
StatusOr<TabletSchemaPtr> get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id);

StatusOr<TabletMetadataPtr> load_tablet_metadata(const std::string& metadata_location, bool fill_cache);
StatusOr<TxnLogPtr> load_txn_log(const std::string& txn_log_location, bool fill_cache);
Expand Down
22 changes: 11 additions & 11 deletions be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
delta_writer->close();
Expand All @@ -112,7 +112,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->open());
Expand All @@ -129,7 +129,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
auto t1 = std::thread([&]() {
for (int i = 0; i < 10000; i++) {
Expand Down Expand Up @@ -165,7 +165,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
// Call open() again
Expand Down Expand Up @@ -249,7 +249,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_concurrently) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());

ASSERT_OK(delta_writer->open());
Expand Down Expand Up @@ -351,7 +351,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand Down Expand Up @@ -387,7 +387,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_finish_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -413,7 +413,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -430,7 +430,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
auto t1 = std::thread([&]() {
Expand All @@ -457,7 +457,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
delta_writer->close();
Expand Down Expand Up @@ -488,7 +488,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_concurrent_write_and_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand Down
Loading

0 comments on commit 1f6e4e5

Please sign in to comment.