Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](3/n) Support fast schema evolution in shared data mode #42737

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/runtime/lake_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,12 @@ void LakeTabletsChannel::_flush_stale_memtables() {
}

Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& params, bool is_incremental) {
int64_t schema_id = 0;
std::vector<SlotDescriptor*>* slots = nullptr;
for (auto& index : _schema->indexes()) {
if (index->index_id == _index_id) {
slots = &index->slots;
schema_id = index->schema_id;
break;
}
}
Expand Down Expand Up @@ -531,7 +533,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_table_id(params.table_id())
.set_immutable_tablet_size(params.immutable_tablet_size())
.set_mem_tracker(_mem_tracker)
.set_index_id(_index_id)
.set_schema_id(schema_id)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/async_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ StatusOr<AsyncDeltaWriterBuilder::AsyncDeltaWriterPtr> AsyncDeltaWriterBuilder::
.set_mem_tracker(_mem_tracker)
.set_immutable_tablet_size(_immutable_tablet_size)
.set_miss_auto_increment_column(_miss_auto_increment_column)
.set_index_id(_index_id)
.set_schema_id(_schema_id)
.build());
auto impl = new AsyncDeltaWriterImpl(std::move(writer));
return std::make_unique<AsyncDeltaWriter>(impl);
Expand Down
6 changes: 3 additions & 3 deletions be/src/storage/lake/async_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ class AsyncDeltaWriterBuilder {
return *this;
}

AsyncDeltaWriterBuilder& set_index_id(int64_t index_id) {
_index_id = index_id;
AsyncDeltaWriterBuilder& set_schema_id(int64_t schema_id) {
_schema_id = schema_id;
return *this;
}

Expand All @@ -181,7 +181,7 @@ class AsyncDeltaWriterBuilder {
int64_t _txn_id{0};
int64_t _table_id{0};
int64_t _partition_id{0};
int64_t _index_id{0};
int64_t _schema_id{0};
int64_t _tablet_id{0};
const std::vector<SlotDescriptor*>* _slots{nullptr};
int64_t _immutable_tablet_size{0};
Expand Down
24 changes: 12 additions & 12 deletions be/src/storage/lake/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@ class DeltaWriterImpl {
explicit DeltaWriterImpl(TabletManager* tablet_manager, int64_t tablet_id, int64_t txn_id, int64_t partition_id,
const std::vector<SlotDescriptor*>* slots, std::string merge_condition,
bool miss_auto_increment_column, int64_t table_id, int64_t immutable_tablet_size,
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t index_id)
MemTracker* mem_tracker, int64_t max_buffer_size, int64_t schema_id)
: _tablet_manager(tablet_manager),
_tablet_id(tablet_id),
_txn_id(txn_id),
_table_id(table_id),
_partition_id(partition_id),
_index_id(index_id),
_schema_id(schema_id),
_mem_tracker(mem_tracker),
_slots(slots),
_max_buffer_size(max_buffer_size > 0 ? max_buffer_size : config::write_buffer_size),
Expand All @@ -94,11 +94,11 @@ class DeltaWriterImpl {

DISALLOW_COPY_AND_MOVE(DeltaWriterImpl);

[[nodiscard]] Status open();
Status open();

[[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);
Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);

[[nodiscard]] Status finish(DeltaWriter::FinishMode mode);
Status finish(DeltaWriter::FinishMode mode);

void close();

Expand All @@ -110,9 +110,9 @@ class DeltaWriterImpl {

[[nodiscard]] MemTracker* mem_tracker() { return _mem_tracker; }

[[nodiscard]] Status flush();
Status flush();

[[nodiscard]] Status flush_async();
Status flush_async();

int64_t queueing_memtable_num() const;

Expand Down Expand Up @@ -146,7 +146,7 @@ class DeltaWriterImpl {
const int64_t _txn_id;
const int64_t _table_id;
const int64_t _partition_id;
const int64_t _index_id;
const int64_t _schema_id;
MemTracker* const _mem_tracker;

const std::vector<SlotDescriptor*>* const _slots;
Expand Down Expand Up @@ -291,7 +291,7 @@ inline Status DeltaWriterImpl::init_tablet_schema() {
return Status::OK();
}
ASSIGN_OR_RETURN(auto tablet, _tablet_manager->get_tablet(_tablet_id));
auto res = tablet.get_schema_by_index_id(_index_id);
auto res = tablet.get_schema_by_id(_schema_id);
if (res.ok()) {
_tablet_schema = std::move(res).value();
return Status::OK();
Expand Down Expand Up @@ -722,12 +722,12 @@ StatusOr<DeltaWriterBuilder::DeltaWriterPtr> DeltaWriterBuilder::build() {
if (UNLIKELY(_miss_auto_increment_column && _table_id == 0)) {
return Status::InvalidArgument("must set table_id when miss_auto_increment_column is true");
}
if (UNLIKELY(_index_id == 0)) {
return Status::InvalidArgument("index_id not set");
if (UNLIKELY(_schema_id == 0)) {
return Status::InvalidArgument("schema_id not set");
}
auto impl = new DeltaWriterImpl(_tablet_mgr, _tablet_id, _txn_id, _partition_id, _slots, _merge_condition,
_miss_auto_increment_column, _table_id, _immutable_tablet_size, _mem_tracker,
_max_buffer_size, _index_id);
_max_buffer_size, _schema_id);
return std::make_unique<DeltaWriter>(impl);
}

sduzh marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
16 changes: 8 additions & 8 deletions be/src/storage/lake/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,21 @@ class DeltaWriter {
DISALLOW_COPY_AND_MOVE(DeltaWriter);

// NOTE: It's ok to invoke this method in a bthread, there is no I/O operation in this method.
[[nodiscard]] Status open();
Status open();

// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);
Status write(const Chunk& chunk, const uint32_t* indexes, uint32_t indexes_size);

// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status finish(FinishMode mode = kWriteTxnLog);
Status finish(FinishMode mode = kWriteTxnLog);

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status flush();
Status flush();

// Manual flush, mainly used in UT
// NOTE: Do NOT invoke this method in a bthread.
[[nodiscard]] Status flush_async();
Status flush_async();

// NOTE: Do NOT invoke this method in a bthread unless you are sure that `write()` has never been called.
void close();
Expand Down Expand Up @@ -169,8 +169,8 @@ class DeltaWriterBuilder {
return *this;
}

DeltaWriterBuilder& set_index_id(int64_t index_id) {
_index_id = index_id;
DeltaWriterBuilder& set_schema_id(int64_t schema_id) {
_schema_id = schema_id;
return *this;
}

Expand All @@ -181,7 +181,7 @@ class DeltaWriterBuilder {
int64_t _txn_id{0};
int64_t _table_id{0};
int64_t _partition_id{0};
int64_t _index_id{0};
int64_t _schema_id{0};
int64_t _tablet_id{0};
const std::vector<SlotDescriptor*>* _slots{nullptr};
std::string _merge_condition{};
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ Status SortedSchemaChange::process(RowsetPtr rowset, RowsetMetadata* new_rowset_
.set_txn_id(_txn_id)
.set_max_buffer_size(_max_buffer_size)
.set_mem_tracker(CurrentThread::mem_tracker())
.set_index_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly
.set_schema_id(_new_tablet_schema->id()) // TODO: pass tablet schema directly
.build());
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/lake/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema() {
return _mgr->get_tablet_schema(_id, &_version_hint);
}

StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema_by_index_id(int64_t index_id) {
return _mgr->get_tablet_schema_by_index_id(_id, index_id);
StatusOr<std::shared_ptr<const TabletSchema>> Tablet::get_schema_by_id(int64_t index_id) {
return _mgr->get_tablet_schema_by_id(_id, index_id);
}

StatusOr<std::vector<RowsetPtr>> Tablet::get_rowsets(int64_t version) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class Tablet {
// NOTE: This method may update the version hint
StatusOr<std::shared_ptr<const TabletSchema>> get_schema();

StatusOr<std::shared_ptr<const TabletSchema>> get_schema_by_index_id(int64_t index_id);
StatusOr<std::shared_ptr<const TabletSchema>> get_schema_by_id(int64_t index_id);

StatusOr<std::vector<RowsetPtr>> get_rowsets(int64_t version);

Expand Down
8 changes: 4 additions & 4 deletions be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema(int64_t tablet_id, in
auto index_id_iter = properties.find("indexId");
if (index_id_iter != properties.end()) {
auto index_id = std::atol(index_id_iter->second.data());
auto res = get_tablet_schema_by_index_id(tablet_id, index_id);
auto res = get_tablet_schema_by_id(tablet_id, index_id);
if (res.ok()) {
return res;
} else if (res.status().is_not_found()) {
Expand Down Expand Up @@ -468,17 +468,17 @@ StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema(int64_t tablet_id, in
return schema;
}

StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id) {
StatusOr<TabletSchemaPtr> TabletManager::get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id) {
auto global_cache_key = global_schema_cache_key(index_id);
auto schema = _metacache->lookup_tablet_schema(global_cache_key);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.1", &schema);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.1", &schema);
if (schema != nullptr) {
return schema;
}
// else: Cache miss, read the schema file
auto schema_file_path = join_path(tablet_root_location(tablet_id), schema_filename(index_id));
auto schema_or = load_and_parse_schema_file(schema_file_path);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_index_id.2", &schema_or);
TEST_SYNC_POINT_CALLBACK("get_tablet_schema_by_id.2", &schema_or);
if (schema_or.ok()) {
VLOG(3) << "Got tablet schema of id " << index_id << " for tablet " << tablet_id;
schema = std::move(schema_or).value();
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class TabletManager {
Status create_schema_file(int64_t tablet_id, const TabletSchemaPB& schema_pb);
StatusOr<TabletSchemaPtr> load_and_parse_schema_file(const std::string& path);
StatusOr<TabletSchemaPtr> get_tablet_schema(int64_t tablet_id, int64_t* version_hint = nullptr);
StatusOr<TabletSchemaPtr> get_tablet_schema_by_index_id(int64_t tablet_id, int64_t index_id);
StatusOr<TabletSchemaPtr> get_tablet_schema_by_id(int64_t tablet_id, int64_t index_id);

StatusOr<TabletMetadataPtr> load_tablet_metadata(const std::string& metadata_location, bool fill_cache);
StatusOr<TxnLogPtr> load_txn_log(const std::string& txn_log_location, bool fill_cache);
Expand Down
22 changes: 11 additions & 11 deletions be/test/storage/lake/async_delta_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
delta_writer->close();
Expand All @@ -112,7 +112,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->open());
Expand All @@ -129,7 +129,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
auto t1 = std::thread([&]() {
for (int i = 0; i < 10000; i++) {
Expand Down Expand Up @@ -165,7 +165,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
// Call open() again
Expand Down Expand Up @@ -249,7 +249,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_concurrently) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());

ASSERT_OK(delta_writer->open());
Expand Down Expand Up @@ -351,7 +351,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_write_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand Down Expand Up @@ -387,7 +387,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_finish_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -413,7 +413,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand All @@ -430,7 +430,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
auto t1 = std::thread([&]() {
Expand All @@ -457,7 +457,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_open_after_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());
delta_writer->close();
Expand Down Expand Up @@ -488,7 +488,7 @@ TEST_F(LakeAsyncDeltaWriterTest, test_concurrent_write_and_close) {
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_index_id(_tablet_schema->id())
.set_schema_id(_tablet_schema->id())
.build());
ASSERT_OK(delta_writer->open());

Expand Down
Loading
Loading