Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] support phased schedule #47868

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")

set(EXEC_FILES
capture_version_node.cpp
data_sink.cpp
empty_set_node.cpp
exec_node.cpp
Expand Down Expand Up @@ -166,6 +167,7 @@ set(EXEC_FILES
sorting/sort_column.cpp
sorting/sort_permute.cpp
connector_scan_node.cpp
pipeline/capture_version_operator.cpp
pipeline/exchange/exchange_merge_sort_source_operator.cpp
pipeline/exchange/exchange_parallel_merge_source_operator.cpp
pipeline/exchange/exchange_sink_operator.cpp
Expand Down
36 changes: 36 additions & 0 deletions be/src/exec/capture_version_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/capture_version_node.h"

#include "exec/pipeline/capture_version_operator.h"
#include "exec/pipeline/pipeline_builder.h"

namespace starrocks {

pipeline::OpFactories CaptureVersionNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
return OpFactories{std::make_shared<pipeline::CaptureVersionOpFactory>(context->next_operator_id(), id())};
}

StatusOr<pipeline::MorselQueueFactoryPtr> CaptureVersionNode::scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& scan_ranges) {
pipeline::Morsels morsels;
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(id(), scan_range));
}
auto morsel_queue = std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
return std::make_unique<pipeline::SharedMorselQueueFactory>(std::move(morsel_queue), 1);
}

} // namespace starrocks
32 changes: 32 additions & 0 deletions be/src/exec/capture_version_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/exec_node.h"
#include "exec/scan_node.h"

namespace starrocks {
class CaptureVersionNode final : public ExecNode {
public:
CaptureVersionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {}
~CaptureVersionNode() override = default;

StatusOr<pipeline::MorselQueueFactoryPtr> scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& global_scan_ranges);

pipeline::OpFactories decompose_to_pipeline(pipeline::PipelineBuilderContext* context) override;
};
} // namespace starrocks
6 changes: 6 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "exec/aggregate/distinct_streaming_node.h"
#include "exec/analytic_node.h"
#include "exec/assert_num_rows_node.h"
#include "exec/capture_version_node.h"
#include "exec/connector_scan_node.h"
#include "exec/cross_join_node.h"
#include "exec/dict_decode_node.h"
Expand All @@ -74,6 +75,7 @@
#include "exec/union_node.h"
#include "exprs/dictionary_get_expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -564,6 +566,10 @@ Status ExecNode::create_vectorized_node(starrocks::RuntimeState* state, starrock
*node = pool->add(new StreamAggregateNode(pool, tnode, descs));
return Status::OK();
}
case TPlanNodeType::CAPTURE_VERSION_NODE: {
*node = pool->add(new CaptureVersionNode(pool, tnode, descs));
return Status::OK();
}
default:
return Status::InternalError(strings::Substitute("Vectorized engine not support node: $0", tnode.node_type));
}
Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,23 @@ StatusOr<TabletSharedPtr> OlapScanNode::get_tablet(const TInternalScanRange* sca
return tablet;
}

StatusOr<std::vector<RowsetSharedPtr>> OlapScanNode::capture_tablet_rowsets(const TabletSharedPtr& tablet,
const TInternalScanRange* scan_range) {
std::vector<RowsetSharedPtr> rowsets;
if (scan_range->__isset.gtid) {
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(scan_range->gtid, &rowsets));
Rowset::acquire_readers(rowsets);
} else {
int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
// Capture row sets of this version tablet.
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &rowsets));
Rowset::acquire_readers(rowsets);
}
return rowsets;
}

int OlapScanNode::estimated_max_concurrent_chunks() const {
// We temporarily assume that the memory tried in the storage layer
// is the same size as the chunk_size * _estimated_scan_row_bytes.
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class OlapScanNode final : public starrocks::ScanNode {
int estimated_max_concurrent_chunks() const;

static StatusOr<TabletSharedPtr> get_tablet(const TInternalScanRange* scan_range);
static StatusOr<std::vector<RowsetSharedPtr>> capture_tablet_rowsets(const TabletSharedPtr& tablet,
const TInternalScanRange* scan_range);

static int compute_priority(int32_t num_submitted_tasks);

int io_tasks_per_scan_operator() const override {
Expand Down
52 changes: 52 additions & 0 deletions be/src/exec/pipeline/capture_version_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/capture_version_operator.h"

#include "common/logging.h"
#include "exec/olap_scan_node.h"
#include "runtime/runtime_state.h"
#include "storage/rowset/rowset.h"
#include "util/defer_op.h"

namespace starrocks::pipeline {
Status CaptureVersionOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_capture_tablet_rowsets_timer = ADD_TIMER(_unique_metrics, "CaptureTabletRowsetsTime");

return Status::OK();
}

StatusOr<ChunkPtr> CaptureVersionOperator::pull_chunk(RuntimeState* state) {
// do capture rowset here
auto defer = DeferOp([this]() { _is_finished = true; });
std::vector<std::vector<RowsetSharedPtr>> tablet_rowsets;

{
SCOPED_TIMER(_capture_tablet_rowsets_timer);
auto scan_rages = _morsel_queue->prepare_olap_scan_ranges();
std::vector<std::vector<RowsetSharedPtr>> tablet_rowsets;
VLOG_QUERY << "capture rowset scan_ranges nums:" << scan_rages.size();
for (auto& scan_range : scan_rages) {
ASSIGN_OR_RETURN(TabletSharedPtr tablet, OlapScanNode::get_tablet(scan_range));
ASSIGN_OR_RETURN(auto rowset, OlapScanNode::capture_tablet_rowsets(tablet, scan_range));
tablet_rowsets.emplace_back(std::move(rowset));
}
//
_rowset_release_guard = MultiRowsetReleaseGuard(std::move(tablet_rowsets), adopt_acquire_t{});
}
return nullptr;
}

} // namespace starrocks::pipeline
60 changes: 60 additions & 0 deletions be/src/exec/pipeline/capture_version_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/pipeline/source_operator.h"
#include "util/runtime_profile.h"

namespace starrocks::pipeline {

// CaptureVersion returns an empty result set.
class CaptureVersionOperator final : public SourceOperator {
public:
CaptureVersionOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence)
: SourceOperator(factory, id, "capture_version", plan_node_id, false, driver_sequence) {}

~CaptureVersionOperator() override = default;

Status prepare(RuntimeState* state) override;

bool has_output() const override { return !_is_finished; }

bool is_finished() const override { return _is_finished; }

StatusOr<ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
MultiRowsetReleaseGuard _rowset_release_guard;
RuntimeProfile::Counter* _capture_tablet_rowsets_timer = nullptr;
bool _is_finished{};
};

class CaptureVersionOpFactory final : public SourceOperatorFactory {
public:
CaptureVersionOpFactory(int32_t id, int32_t plan_node_id)
: SourceOperatorFactory(id, "capture_version", plan_node_id) {}

~CaptureVersionOpFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<CaptureVersionOperator>(this, _id, _plan_node_id, driver_sequence);
}

bool with_morsels() const override { return true; }

SourceOperatorFactory::AdaptiveState adaptive_initial_state() const override { return AdaptiveState::ACTIVE; }
};

} // namespace starrocks::pipeline
36 changes: 36 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@

#include "exec/pipeline/fragment_context.h"

#include <memory>

#include "exec/data_sink.h"
#include "exec/pipeline/group_execution/execution_group.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -85,6 +90,37 @@ void FragmentContext::count_down_execution_group(size_t val) {
auto status = final_status();
state->exec_env()->wg_driver_executor()->report_exec_state(query_ctx, this, status, true, true);

if (_report_when_finish) {
/// TODO: report fragment finish to BE coordinator
TReportFragmentFinishResponse res;
TReportFragmentFinishParams params;
params.__set_query_id(query_id());
params.__set_fragment_instance_id(fragment_instance_id());
// params.query_id = query_id();
// params.fragment_instance_id = fragment_instance_id();
const auto& fe_addr = state->fragment_ctx()->fe_addr();

class RpcRunnable : public Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to move RpcRunnable outerside of this function, or use a lambda instead?
although it is a legal syntax, however, it looks some weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will refator here after #48233 merged

public:
RpcRunnable(const TNetworkAddress& fe_addr, const TReportFragmentFinishResponse& res,
const TReportFragmentFinishParams& params)
: fe_addr(fe_addr), res(res), params(params) {}
const TNetworkAddress fe_addr;
TReportFragmentFinishResponse res;
const TReportFragmentFinishParams params;

void run() override {
(void)ThriftRpcHelper::rpc<FrontendServiceClient>(
fe_addr.hostname, fe_addr.port,
[&](FrontendServiceConnection& client) { client->reportFragmentFinish(res, params); });
}
};
//
std::shared_ptr<Runnable> runnable;
runnable = std::make_shared<RpcRunnable>(fe_addr, res, params);
(void)state->exec_env()->streaming_load_thread_pool()->submit(runnable);
}

destroy_pass_through_chunk_buffer();

query_ctx->count_down_fragments();
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class FragmentContext {
bool enable_group_execution() const { return _enable_group_execution; }
void set_enable_group_execution(bool enable_group_execution) { _enable_group_execution = enable_group_execution; }

void set_report_when_finish(bool report) { _report_when_finish = report; }

private:
bool _enable_group_execution = false;
// Id of this query
Expand Down Expand Up @@ -227,6 +229,8 @@ class FragmentContext {

RuntimeProfile::Counter* _jit_counter = nullptr;
RuntimeProfile::Counter* _jit_timer = nullptr;

bool _report_when_finish{};
};

class FragmentContextManager {
Expand Down
18 changes: 17 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <unordered_map>

#include "common/config.h"
#include "exec/capture_version_node.h"
#include "exec/cross_join_node.h"
#include "exec/exchange_node.h"
#include "exec/exec_node.h"
Expand Down Expand Up @@ -103,6 +104,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
}

_query_ctx = exec_env->query_context_mgr()->get_or_register(query_id);
if (_query_ctx == nullptr) {
return Status::Cancelled("Query has been cancelled");
}
_query_ctx->set_exec_env(exec_env);
if (params.__isset.instances_number) {
_query_ctx->set_total_fragments(params.instances_number);
Expand Down Expand Up @@ -254,6 +258,8 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
exec_env->runtime_filter_worker()->open_query(query_id, query_options, *runtime_filter_params, true);
}
_fragment_ctx->prepare_pass_through_chunk_buffer();
_fragment_ctx->set_report_when_finish(request.unique().params.__isset.report_when_finish &&
request.unique().params.report_when_finish);

auto* obj_pool = runtime_state->obj_pool();
// Set up desc tbl
Expand Down Expand Up @@ -506,6 +512,15 @@ Status FragmentExecutor::_prepare_exec_plan(ExecEnv* exec_env, const UnifiedExec
}
}

std::vector<ExecNode*> capture_version_nodes;
plan->collect_nodes(TPlanNodeType::CAPTURE_VERSION_NODE, &capture_version_nodes);
for (auto* node : capture_version_nodes) {
const std::vector<TScanRangeParams>& scan_ranges = request.scan_ranges_of_node(node->id());
ASSIGN_OR_RETURN(auto morsel_queue_factory,
down_cast<CaptureVersionNode*>(node)->scan_range_to_morsel_queue_factory(scan_ranges));
morsel_queue_factories.emplace(node->id(), std::move(morsel_queue_factory));
}

if (_wg && _wg->big_query_scan_rows_limit() > 0) {
if (logical_scan_limit >= 0 && logical_scan_limit <= _wg->big_query_scan_rows_limit()) {
_query_ctx->set_scan_limit(std::max(_wg->big_query_scan_rows_limit(), physical_scan_limit));
Expand Down Expand Up @@ -747,7 +762,8 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
LOG(WARNING) << "Prepare fragment failed: " << print_id(request.common().params.query_id)
<< " fragment_instance_id=" << print_id(request.fragment_instance_id())
<< " is_stream_pipeline=" << request.is_stream_pipeline()
<< " backend_num=" << request.backend_num() << " fragment= " << request.common().fragment;
<< " backend_num=" << request.backend_num();
VLOG_QUERY << "Prepare fragment failed fragment=" << request.common().fragment;
}
});

Expand Down
Loading
Loading