Skip to content

Commit

Permalink
Fix ttl-actualization of locked portions (#14028)
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Jan 31, 2025
1 parent 1efdace commit 5f5960b
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 197 deletions.
308 changes: 148 additions & 160 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ydb/core/testlib/cs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class THelperSchemaless : public NCommon::THelper {
void SendDataViaActorSystem(TString testTable, std::shared_ptr<arrow::RecordBatch> batch, const Ydb::StatusIds_StatusCode& expectedStatus = Ydb::StatusIds::SUCCESS) const;

virtual std::shared_ptr<arrow::RecordBatch> TestArrowBatch(ui64 pathIdBegin, ui64 tsBegin, size_t rowCount, const ui64 tsStepUs = 1) const = 0;
virtual ~THelperSchemaless() = default;
};

class THelper: public THelperSchemaless {
Expand Down
21 changes: 3 additions & 18 deletions ydb/core/tx/columnshard/data_locks/manager/manager.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include "manager.h"

#include <ydb/library/actors/core/log.h>

namespace NKikimr::NOlap::NDataLocks {
Expand All @@ -17,28 +18,12 @@ void TManager::UnregisterLock(const TString& processId) {

std::optional<TString> TManager::IsLocked(
const TPortionInfo& portion, const ELockCategory lockCategory, const THashSet<TString>& excludedLocks) const {
for (auto&& i : ProcessLocks) {
if (excludedLocks.contains(i.first)) {
continue;
}
if (auto lockName = i.second->IsLocked(portion, lockCategory, excludedLocks)) {
return lockName;
}
}
return {};
return IsLockedImpl(portion, lockCategory, excludedLocks);
}

std::optional<TString> TManager::IsLocked(
const TGranuleMeta& granule, const ELockCategory lockCategory, const THashSet<TString>& excludedLocks) const {
for (auto&& i : ProcessLocks) {
if (excludedLocks.contains(i.first)) {
continue;
}
if (auto lockName = i.second->IsLocked(granule, lockCategory, excludedLocks)) {
return lockName;
}
}
return {};
return IsLockedImpl(granule, lockCategory, excludedLocks);
}

std::optional<TString> TManager::IsLocked(const std::shared_ptr<const TPortionInfo>& portion, const ELockCategory lockCategory,
Expand Down
30 changes: 30 additions & 0 deletions ydb/core/tx/columnshard/data_locks/manager/manager.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
#pragma once
#include <ydb/core/tx/columnshard/data_locks/locks/abstract.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>

#include <util/generic/hash.h>
#include <util/generic/string.h>
#include <util/string/builder.h>

#include <optional>

namespace NKikimr::NOlap::NDataLocks {
Expand All @@ -11,6 +15,32 @@ class TManager {
THashMap<TString, std::shared_ptr<ILock>> ProcessLocks;
std::shared_ptr<TAtomicCounter> StopFlag = std::make_shared<TAtomicCounter>(0);
void UnregisterLock(const TString& processId);

private:
template <typename TObject>
std::optional<TString> IsLockedImpl(const TObject& portion, const ELockCategory lockCategory, const THashSet<TString>& excludedLocks) const {
const auto& isLocked = [&](const TString& name, const std::shared_ptr<ILock>& lock) -> std::optional<TString> {
if (excludedLocks.contains(name)) {
return std::nullopt;
}
if (auto lockName = lock->IsLocked(portion, lockCategory, excludedLocks)) {
return lockName;
}
return std::nullopt;
};
for (auto&& [name, lock] : ProcessLocks) {
if (auto locked = isLocked(name, lock)) {
return locked;
}
}
for (auto&& [name, lock] : NYDBTest::TControllers::GetColumnShardController()->GetExternalDataLocks()) {
if (auto locked = isLocked(name, lock)) {
return locked;
}
}
return {};
}

public:
TManager() = default;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ TTieringProcessContext::TTieringProcessContext(const ui64 memoryUsageLimit, cons

}

bool TTieringProcessContext::AddPortion(
TTieringProcessContext::EAddPortionResult TTieringProcessContext::AddPortion(
const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait) {
if (!UsedPortions.emplace(info->GetAddress()).second) {
return true;
return EAddPortionResult::PORTION_LOCKED;
}
if (DataLocksManager->IsLocked(*info, NDataLocks::ELockCategory::Actualization)) {
return true;
return EAddPortionResult::PORTION_LOCKED;
}

const auto buildNewTask = [&]() {
Expand All @@ -40,7 +40,7 @@ bool TTieringProcessContext::AddPortion(
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
return false;
return EAddPortionResult::TASK_LIMIT_EXCEEDED;
}
features.OnSkipPortionWithProcessMemory(Counters, *dWait);
}
Expand All @@ -49,7 +49,7 @@ bool TTieringProcessContext::AddPortion(
if (Controller->IsNewTaskAvailable(it->first, it->second.size())) {
it->second.emplace_back(buildNewTask());
} else {
return false;
return EAddPortionResult::TASK_LIMIT_EXCEEDED;
}
features.OnSkipPortionWithTxLimit(Counters, *dWait);
}
Expand All @@ -70,7 +70,7 @@ bool TTieringProcessContext::AddPortion(
it->second.back().GetTask()->AddPortionToEvict(info, std::move(features));
AFL_VERIFY(!it->second.back().GetTask()->GetPortionsToRemove().HasPortions())("rw", features.GetRWAddress().DebugString())("f", it->first.DebugString());
}
return true;
return EAddPortionResult::SUCCESS;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ class TTaskConstructor {
};

class TTieringProcessContext {
public:
enum class EAddPortionResult {
SUCCESS = 0,
TASK_LIMIT_EXCEEDED,
PORTION_LOCKED,
};

private:
const TVersionedIndex& VersionedIndex;
THashSet<TPortionAddress> UsedPortions;
Expand Down Expand Up @@ -79,7 +86,7 @@ class TTieringProcessContext {
return result;
}

bool AddPortion(const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);
EAddPortionResult AddPortion(const std::shared_ptr<const TPortionInfo>& info, TPortionEvictionFeatures&& features, const std::optional<TDuration> dWait);

bool IsRWAddressAvailable(const TRWAddress& address) const {
auto it = Tasks.find(address);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,21 @@ void TSchemeActualizer::DoExtractTasks(TTieringProcessContext& tasksContext, con
TPortionEvictionFeatures features(portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
features.SetTargetTierName(portion->GetTierNameDef(IStoragesManager::DefaultStorageId));

if (!tasksContext.AddPortion(portion, std::move(features), {})) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("context", tasksContext.DebugString());
bool limitExceeded = false;
switch (tasksContext.AddPortion(portion, std::move(features), {})) {
case TTieringProcessContext::EAddPortionResult::TASK_LIMIT_EXCEEDED:
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_ACTUALIZATION)("event", "cannot_add_portion")("reason", "limit_exceeded")(
"context", tasksContext.DebugString());
limitExceeded = true;
break;
case TTieringProcessContext::EAddPortionResult::PORTION_LOCKED:
break;
case TTieringProcessContext::EAddPortionResult::SUCCESS:
portionsToRemove.emplace(portion->GetPortionId());
break;
}
if (limitExceeded) {
break;
} else {
portionsToRemove.emplace(portion->GetPortionId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,18 @@ void TTieringActualizer::DoExtractTasks(
portionScheme, info->GetTargetScheme(), portion->GetTierNameDef(IStoragesManager::DefaultStorageId));
features.SetTargetTierName(info->GetTargetTierName());

if (!tasksContext.AddPortion(portion, std::move(features), info->GetLateness())) {
limitEnriched = true;
switch (tasksContext.AddPortion(portion, std::move(features), info->GetLateness())) {
case TTieringProcessContext::EAddPortionResult::TASK_LIMIT_EXCEEDED:
limitEnriched = true;
break;
case TTieringProcessContext::EAddPortionResult::PORTION_LOCKED:
break;
case TTieringProcessContext::EAddPortionResult::SUCCESS:
AFL_VERIFY(portionIds.emplace(portion->GetPortionId()).second);
break;
}
if (limitEnriched) {
break;
} else {
portionIds.emplace(portion->GetPortionId());
}
}
if (limitEnriched) {
Expand Down
20 changes: 18 additions & 2 deletions ydb/core/tx/columnshard/hooks/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class TDataAccessorsResult;
namespace NIndexes {
class TIndexMetaContainer;
}
namespace NDataLocks {
class ILock;
}
} // namespace NKikimr::NOlap
namespace arrow {
class RecordBatch;
Expand Down Expand Up @@ -330,6 +333,9 @@ class ICSController {
Y_UNUSED(txInfo);
}

virtual THashMap<TString, std::shared_ptr<NKikimr::NOlap::NDataLocks::ILock>> GetExternalDataLocks() const {
return {};
}
};

class TControllers {
Expand All @@ -338,7 +344,7 @@ class TControllers {

public:
template <class TController>
class TGuard: TNonCopyable {
class TGuard: TMoveOnly {
private:
std::shared_ptr<TController> Controller;

Expand All @@ -348,12 +354,22 @@ class TControllers {
Y_ABORT_UNLESS(Controller);
}

TGuard(TGuard&& other)
: TGuard(other.Controller) {
other.Controller = nullptr;
}
TGuard& operator=(TGuard&& other) {
std::swap(Controller, other.Controller);
}

TController* operator->() {
return Controller.get();
}

~TGuard() {
Singleton<TControllers>()->CSController = std::make_shared<ICSController>();
if (Controller) {
Singleton<TControllers>()->CSController = std::make_shared<ICSController>();
}
}
};

Expand Down
21 changes: 19 additions & 2 deletions ydb/core/tx/columnshard/hooks/testing/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class TController: public TReadOnlyController {
TMutex ActiveTabletsMutex;
std::set<ui64> ActiveTablets;

THashMap<TString, std::shared_ptr<NOlap::NDataLocks::ILock>> ExternalLocks;

class TBlobInfo {
private:
const NOlap::TUnifiedBlobId BlobId;
Expand Down Expand Up @@ -218,6 +220,11 @@ class TController: public TReadOnlyController {
SharingIds.emplace(sessionId);
}

virtual THashMap<TString, std::shared_ptr<NOlap::NDataLocks::ILock>> GetExternalDataLocks() const override {
TGuard<TMutex> g(Mutex);
return ExternalLocks;
}

public:
virtual bool CheckPortionsToMergeOnCompaction(const ui64 /*memoryAfterAdd*/, const ui32 currentSubsetsCount) override {
return currentSubsetsCount > 1;
Expand Down Expand Up @@ -266,6 +273,16 @@ class TController: public TReadOnlyController {
CompactionControl = value;
}

void RegisterLock(const TString& name, const std::shared_ptr<NOlap::NDataLocks::ILock>& lock) {
TGuard<TMutex> g(Mutex);
AFL_VERIFY(ExternalLocks.emplace(name, lock).second)("name", name);
}

void UnregisterLock(const TString& name) {
TGuard<TMutex> g(Mutex);
AFL_VERIFY(ExternalLocks.erase(name))("name", name);
}

bool HasPKSortingOnly() const;

void OnSwitchToWork(const ui64 tabletId) override {
Expand All @@ -292,8 +309,8 @@ class TController: public TReadOnlyController {
RestartOnLocalDbTxCommitted = std::move(txInfo);
}

virtual void OnAfterLocalTxCommitted(const NActors::TActorContext& ctx, const ::NKikimr::NColumnShard::TColumnShard& shard, const TString& txInfo) override;

virtual void OnAfterLocalTxCommitted(
const NActors::TActorContext& ctx, const ::NKikimr::NColumnShard::TColumnShard& shard, const TString& txInfo) override;
};

}
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/hooks/testing/ro_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,19 @@ class TReadOnlyController: public ICSController {
}
}

void WaitTtl(const TDuration d) const {
TInstant start = TInstant::Now();
ui32 countStart = GetTTLStartedCounter().Val();
while (Now() - start < d) {
if (countStart != GetTTLStartedCounter().Val()) {
countStart = GetTTLStartedCounter().Val();
start = TInstant::Now();
}
Cerr << "WAIT_TTL: " << GetTTLStartedCounter().Val() << Endl;
Sleep(TDuration::Seconds(1));
}
}

template <class TTester>
void WaitCondition(const TDuration d, const TTester& test) const {
const TInstant start = TInstant::Now();
Expand Down

0 comments on commit 5f5960b

Please sign in to comment.