Skip to content

Commit

Permalink
Deprecate the legacy query ctx creation code (#9864)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #9864

Reviewed By: tanjialiang

Differential Revision: D57539994

Pulled By: xiaoxmeng

fbshipit-source-id: b1b24cf00607da750f692ed911e9e935560c6a04
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed May 21, 2024
1 parent e70412d commit a0222ab
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pyvelox/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct PyVeloxContext {
std::shared_ptr<facebook::velox::memory::MemoryPool> pool_ =
facebook::velox::memory::deprecatedAddDefaultLeafMemoryPool();
std::shared_ptr<facebook::velox::core::QueryCtx> queryCtx_ =
std::make_shared<facebook::velox::core::QueryCtx>();
facebook::velox::core::QueryCtx::create();
std::unique_ptr<facebook::velox::core::ExecCtx> execCtx_ =
std::make_unique<facebook::velox::core::ExecCtx>(
pool_.get(),
Expand Down
36 changes: 0 additions & 36 deletions velox/core/QueryCtx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,6 @@ namespace facebook::velox::core {
return queryCtx;
}

/*static*/ std::shared_ptr<QueryCtx> QueryCtx::create(
folly::Executor::KeepAlive<> executorKeepalive,
std::unordered_map<std::string, std::string> queryConfigValues,
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorSessionProperties,
cache::AsyncDataCache* cache,
std::shared_ptr<memory::MemoryPool> pool,
const std::string& queryId) {
std::shared_ptr<QueryCtx> queryCtx(new QueryCtx(
executorKeepalive,
std::move(queryConfigValues),
std::move(connectorSessionProperties),
cache,
std::move(pool),
queryId));
queryCtx->maybeSetReclaimer();
return queryCtx;
}

QueryCtx::QueryCtx(
folly::Executor* executor,
QueryConfig&& queryConfig,
Expand All @@ -77,23 +58,6 @@ QueryCtx::QueryCtx(
initPool(queryId);
}

QueryCtx::QueryCtx(
folly::Executor::KeepAlive<> executorKeepalive,
std::unordered_map<std::string, std::string> queryConfigValues,
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorSessionProperties,
cache::AsyncDataCache* cache,
std::shared_ptr<memory::MemoryPool> pool,
const std::string& queryId)
: queryId_(queryId),
cache_(cache),
connectorSessionProperties_(connectorSessionProperties),
pool_(std::move(pool)),
executorKeepalive_(std::move(executorKeepalive)),
queryConfig_{std::move(queryConfigValues)} {
initPool(queryId);
}

/*static*/ std::string QueryCtx::generatePoolName(const std::string& queryId) {
// We attach a monotonically increasing sequence number to ensure the pool
// name is unique.
Expand Down
70 changes: 20 additions & 50 deletions velox/core/QueryCtx.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,6 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
folly::Executor* spillExecutor = nullptr,
const std::string& queryId = "");

/// Constructor to block the destruction of executor while this object is
/// alive.
///
/// This constructor does not keep the ownership of executor.
static std::shared_ptr<QueryCtx> create(
folly::Executor::KeepAlive<> executorKeepalive,
std::unordered_map<std::string, std::string> queryConfigValues = {},
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
std::shared_ptr<memory::MemoryPool> pool = nullptr,
const std::string& queryId = "");

/// QueryCtx is used in different places. When used with `Task::start()`, it's
/// required that the caller supplies the executor and ensure its lifetime
/// outlives the tasks that use it. In contrast, when used in expression
/// evaluation through `ExecCtx` or 'Task::next()' for single thread execution
/// mode, executor is not needed. Hence, we don't require executor to always
/// be passed in here, but instead, ensure that executor exists when actually
/// being used.
QueryCtx(
folly::Executor* executor = nullptr,
QueryConfig&& queryConfig = QueryConfig{{}},
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
std::shared_ptr<memory::MemoryPool> pool = nullptr,
folly::Executor* spillExecutor = nullptr,
const std::string& queryId = "");

/// Constructor to block the destruction of executor while this
/// object is alive.
///
/// This constructor does not keep the ownership of executor.
explicit QueryCtx(
folly::Executor::KeepAlive<> executorKeepalive,
std::unordered_map<std::string, std::string> queryConfigValues = {},
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
std::shared_ptr<memory::MemoryPool> pool = nullptr,
const std::string& queryId = "");

static std::string generatePoolName(const std::string& queryId);

memory::MemoryPool* pool() const {
Expand All @@ -103,15 +60,12 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
}

folly::Executor* executor() const {
VELOX_CHECK(isExecutorSupplied(), "Executor was not supplied.");
if (executor_ != nullptr) {
return executor_;
}
return executorKeepalive_.get();
return executor_;
;
}

bool isExecutorSupplied() const {
return executor_ != nullptr || executorKeepalive_.get() != nullptr;
return executor_ != nullptr;
}

const QueryConfig& queryConfig() const {
Expand Down Expand Up @@ -171,6 +125,23 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
}

private:
/// QueryCtx is used in different places. When used with `Task::start()`, it's
/// required that the caller supplies the executor and ensure its lifetime
/// outlives the tasks that use it. In contrast, when used in expression
/// evaluation through `ExecCtx` or 'Task::next()' for single thread execution
/// mode, executor is not needed. Hence, we don't require executor to always
/// be passed in here, but instead, ensure that executor exists when actually
/// being used.
QueryCtx(
folly::Executor* executor = nullptr,
QueryConfig&& queryConfig = QueryConfig{{}},
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorConfigs = {},
cache::AsyncDataCache* cache = cache::AsyncDataCache::getInstance(),
std::shared_ptr<memory::MemoryPool> pool = nullptr,
folly::Executor* spillExecutor = nullptr,
const std::string& queryId = "");

class MemoryReclaimer : public memory::MemoryReclaimer {
public:
static std::unique_ptr<memory::MemoryReclaimer> create(
Expand Down Expand Up @@ -233,7 +204,6 @@ class QueryCtx : public std::enable_shared_from_this<QueryCtx> {
std::unordered_map<std::string, std::shared_ptr<Config>>
connectorSessionProperties_;
std::shared_ptr<memory::MemoryPool> pool_;
folly::Executor::KeepAlive<> executorKeepalive_;
QueryConfig queryConfig_;
std::atomic<uint64_t> numSpilledBytes_{0};

Expand Down
2 changes: 1 addition & 1 deletion velox/core/tests/PlanFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class PlanFragmentTest : public testing::Test {
{QueryConfig::kOrderBySpillEnabled,
orderBySpillEnabled ? "true" : "false"},
});
return QueryCtx::create(nullptr, std::move(configData));
return QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
}

RowTypePtr rowType_;
Expand Down
13 changes: 7 additions & 6 deletions velox/core/tests/QueryConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class QueryConfigTest : public testing::Test {
};

TEST_F(QueryConfigTest, emptyConfig) {
std::unordered_map<std::string, std::string> configData;
auto queryCtx = QueryCtx::create(nullptr, std::move(configData));
auto queryCtx = QueryCtx::create(nullptr, QueryConfig{{}});
const QueryConfig& config = queryCtx->queryConfig();

ASSERT_FALSE(config.isLegacyCast());
Expand All @@ -40,7 +39,7 @@ TEST_F(QueryConfigTest, setConfig) {
std::string path = "/tmp/setConfig";
std::unordered_map<std::string, std::string> configData(
{{QueryConfig::kLegacyCast, "true"}});
auto queryCtx = QueryCtx::create(nullptr, std::move(configData));
auto queryCtx = QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const QueryConfig& config = queryCtx->queryConfig();

ASSERT_TRUE(config.isLegacyCast());
Expand All @@ -50,7 +49,7 @@ TEST_F(QueryConfigTest, invalidConfig) {
std::unordered_map<std::string, std::string> configData(
{{QueryConfig::kSessionTimezone, "Invalid"}});
VELOX_ASSERT_USER_THROW(
QueryCtx::create(nullptr, std::move(configData)),
QueryCtx::create(nullptr, QueryConfig{std::move(configData)}),
"Unknown time zone: 'Invalid'");

auto queryCtx = QueryCtx::create(nullptr);
Expand Down Expand Up @@ -135,7 +134,8 @@ TEST_F(QueryConfigTest, taskWriterCountConfig) {
QueryConfig::kTaskPartitionedWriterCount,
std::to_string(testConfig.numPartitionedWriterCounter.value()));
}
auto queryCtx = QueryCtx::create(nullptr, std::move(configData));
auto queryCtx =
QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(config.taskWriterCount(), testConfig.expectedWriterCounter);
ASSERT_EQ(
Expand All @@ -153,7 +153,8 @@ TEST_F(QueryConfigTest, enableExpressionEvaluationCacheConfig) {
std::unordered_map<std::string, std::string> configData(
{{core::QueryConfig::kEnableExpressionEvaluationCache,
enableExpressionEvaluationCache ? "true" : "false"}});
auto queryCtx = core::QueryCtx::create(nullptr, std::move(configData));
auto queryCtx =
core::QueryCtx::create(nullptr, QueryConfig{std::move(configData)});
const core::QueryConfig& config = queryCtx->queryConfig();
ASSERT_EQ(
config.isExpressionEvaluationCacheEnabled(),
Expand Down
9 changes: 6 additions & 3 deletions velox/exec/tests/DriverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,8 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
"t0",
fragment,
0,
core::QueryCtx::create(driverExecutor_.get(), std::move(queryConfig)),
core::QueryCtx::create(
driverExecutor_.get(), core::QueryConfig{std::move(queryConfig)}),
testParam.executionMode,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
Expand All @@ -1529,7 +1530,8 @@ DEBUG_ONLY_TEST_F(DriverTest, driverCpuTimeSlicingCheck) {
"t0",
fragment,
0,
core::QueryCtx::create(driverExecutor_.get(), std::move(queryConfig)),
core::QueryCtx::create(
driverExecutor_.get(), core::QueryConfig{std::move(queryConfig)}),
testParam.executionMode);
while (task->next() != nullptr) {
}
Expand Down Expand Up @@ -1601,7 +1603,8 @@ TEST_F(OpCallStatusTest, basic) {
"t19",
fragment,
0,
core::QueryCtx::create(driverExecutor_.get(), std::move(queryConfig)),
core::QueryCtx::create(
driverExecutor_.get(), core::QueryConfig{std::move(queryConfig)}),
Task::ExecutionMode::kParallel,
[](RowVectorPtr /*unused*/, ContinueFuture* /*unused*/) {
return exec::BlockingReason::kNotBlocked;
Expand Down

0 comments on commit a0222ab

Please sign in to comment.