Skip to content

Commit

Permalink
[Enhancement] support phased schedule
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jul 17, 2024
1 parent 12adbd4 commit cb8113c
Show file tree
Hide file tree
Showing 54 changed files with 2,034 additions and 101 deletions.
2 changes: 2 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/exec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/exec")

set(EXEC_FILES
capture_version_node.cpp
data_sink.cpp
empty_set_node.cpp
exec_node.cpp
Expand Down Expand Up @@ -166,6 +167,7 @@ set(EXEC_FILES
sorting/sort_column.cpp
sorting/sort_permute.cpp
connector_scan_node.cpp
pipeline/capture_version_operator.cpp
pipeline/exchange/exchange_merge_sort_source_operator.cpp
pipeline/exchange/exchange_parallel_merge_source_operator.cpp
pipeline/exchange/exchange_sink_operator.cpp
Expand Down
36 changes: 36 additions & 0 deletions be/src/exec/capture_version_node.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/capture_version_node.h"

#include "exec/pipeline/capture_version_operator.h"
#include "exec/pipeline/pipeline_builder.h"

namespace starrocks {

pipeline::OpFactories CaptureVersionNode::decompose_to_pipeline(pipeline::PipelineBuilderContext* context) {
return OpFactories{std::make_shared<pipeline::CaptureVersionOpFactory>(context->next_operator_id(), id())};
}

StatusOr<pipeline::MorselQueueFactoryPtr> CaptureVersionNode::scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& scan_ranges) {
pipeline::Morsels morsels;
for (const auto& scan_range : scan_ranges) {
morsels.emplace_back(std::make_unique<pipeline::ScanMorsel>(id(), scan_range));
}
auto morsel_queue = std::make_unique<pipeline::FixedMorselQueue>(std::move(morsels));
return std::make_unique<pipeline::SharedMorselQueueFactory>(std::move(morsel_queue), 1);
}

} // namespace starrocks
32 changes: 32 additions & 0 deletions be/src/exec/capture_version_node.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/exec_node.h"
#include "exec/scan_node.h"

namespace starrocks {
class CaptureVersionNode final : public ExecNode {
public:
CaptureVersionNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs) {}
~CaptureVersionNode() override = default;

StatusOr<pipeline::MorselQueueFactoryPtr> scan_range_to_morsel_queue_factory(
const std::vector<TScanRangeParams>& global_scan_ranges);

pipeline::OpFactories decompose_to_pipeline(pipeline::PipelineBuilderContext* context) override;
};
} // namespace starrocks
6 changes: 6 additions & 0 deletions be/src/exec/exec_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "exec/aggregate/distinct_streaming_node.h"
#include "exec/analytic_node.h"
#include "exec/assert_num_rows_node.h"
#include "exec/capture_version_node.h"
#include "exec/connector_scan_node.h"
#include "exec/cross_join_node.h"
#include "exec/dict_decode_node.h"
Expand All @@ -74,6 +75,7 @@
#include "exec/union_node.h"
#include "exprs/dictionary_get_expr.h"
#include "exprs/expr_context.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gutil/strings/substitute.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -564,6 +566,10 @@ Status ExecNode::create_vectorized_node(starrocks::RuntimeState* state, starrock
*node = pool->add(new StreamAggregateNode(pool, tnode, descs));
return Status::OK();
}
case TPlanNodeType::CAPTURE_VERSION_NODE: {
*node = pool->add(new CaptureVersionNode(pool, tnode, descs));
return Status::OK();
}
default:
return Status::InternalError(strings::Substitute("Vectorized engine not support node: $0", tnode.node_type));
}
Expand Down
17 changes: 17 additions & 0 deletions be/src/exec/olap_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,23 @@ StatusOr<TabletSharedPtr> OlapScanNode::get_tablet(const TInternalScanRange* sca
return tablet;
}

StatusOr<std::vector<RowsetSharedPtr>> OlapScanNode::capture_tablet_rowsets(const TabletSharedPtr& tablet,
const TInternalScanRange* scan_range) {
std::vector<RowsetSharedPtr> rowsets;
if (scan_range->__isset.gtid) {
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(scan_range->gtid, &rowsets));
Rowset::acquire_readers(rowsets);
} else {
int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
// Capture row sets of this version tablet.
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &rowsets));
Rowset::acquire_readers(rowsets);
}
return rowsets;
}

int OlapScanNode::estimated_max_concurrent_chunks() const {
// We temporarily assume that the memory tried in the storage layer
// is the same size as the chunk_size * _estimated_scan_row_bytes.
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/olap_scan_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class OlapScanNode final : public starrocks::ScanNode {
int estimated_max_concurrent_chunks() const;

static StatusOr<TabletSharedPtr> get_tablet(const TInternalScanRange* scan_range);
static StatusOr<std::vector<RowsetSharedPtr>> capture_tablet_rowsets(const TabletSharedPtr& tablet,
const TInternalScanRange* scan_range);

static int compute_priority(int32_t num_submitted_tasks);

int io_tasks_per_scan_operator() const override {
Expand Down
52 changes: 52 additions & 0 deletions be/src/exec/pipeline/capture_version_operator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/pipeline/capture_version_operator.h"

#include "common/logging.h"
#include "exec/olap_scan_node.h"
#include "runtime/runtime_state.h"
#include "storage/rowset/rowset.h"
#include "util/defer_op.h"

namespace starrocks::pipeline {
Status CaptureVersionOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(SourceOperator::prepare(state));
_capture_tablet_rowsets_timer = ADD_TIMER(_unique_metrics, "CaptureTabletRowsetsTime");

return Status::OK();
}

StatusOr<ChunkPtr> CaptureVersionOperator::pull_chunk(RuntimeState* state) {
// do capture rowset here
auto defer = DeferOp([this]() { _is_finished = true; });
std::vector<std::vector<RowsetSharedPtr>> tablet_rowsets;

{
SCOPED_TIMER(_capture_tablet_rowsets_timer);
auto scan_rages = _morsel_queue->prepare_olap_scan_ranges();
std::vector<std::vector<RowsetSharedPtr>> tablet_rowsets;
VLOG_QUERY << "capture rowset scan_ranges nums:" << scan_rages.size();
for (auto& scan_range : scan_rages) {
ASSIGN_OR_RETURN(TabletSharedPtr tablet, OlapScanNode::get_tablet(scan_range));
ASSIGN_OR_RETURN(auto rowset, OlapScanNode::capture_tablet_rowsets(tablet, scan_range));
tablet_rowsets.emplace_back(std::move(rowset));
}
//
_rowset_release_guard = MultiRowsetReleaseGuard(std::move(tablet_rowsets), adopt_acquire_t{});
}
return nullptr;
}

} // namespace starrocks::pipeline
60 changes: 60 additions & 0 deletions be/src/exec/pipeline/capture_version_operator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/pipeline/source_operator.h"
#include "util/runtime_profile.h"

namespace starrocks::pipeline {

// CaptureVersion returns an empty result set.
class CaptureVersionOperator final : public SourceOperator {
public:
CaptureVersionOperator(OperatorFactory* factory, int32_t id, int32_t plan_node_id, int32_t driver_sequence)
: SourceOperator(factory, id, "capture_version", plan_node_id, false, driver_sequence) {}

~CaptureVersionOperator() override = default;

Status prepare(RuntimeState* state) override;

bool has_output() const override { return !_is_finished; }

bool is_finished() const override { return _is_finished; }

StatusOr<ChunkPtr> pull_chunk(RuntimeState* state) override;

private:
MultiRowsetReleaseGuard _rowset_release_guard;
RuntimeProfile::Counter* _capture_tablet_rowsets_timer = nullptr;
bool _is_finished{};
};

class CaptureVersionOpFactory final : public SourceOperatorFactory {
public:
CaptureVersionOpFactory(int32_t id, int32_t plan_node_id)
: SourceOperatorFactory(id, "capture_version", plan_node_id) {}

~CaptureVersionOpFactory() override = default;

OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override {
return std::make_shared<CaptureVersionOperator>(this, _id, _plan_node_id, driver_sequence);
}

bool with_morsels() const override { return true; }

SourceOperatorFactory::AdaptiveState adaptive_initial_state() const override { return AdaptiveState::ACTIVE; }
};

} // namespace starrocks::pipeline
36 changes: 36 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@

#include "exec/pipeline/fragment_context.h"

#include <memory>

#include "exec/data_sink.h"
#include "exec/pipeline/group_execution/execution_group.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -85,6 +90,37 @@ void FragmentContext::count_down_execution_group(size_t val) {
auto status = final_status();
state->exec_env()->wg_driver_executor()->report_exec_state(query_ctx, this, status, true, true);

if (_report_when_finish) {
/// TODO: report fragment finish to BE coordinator
TReportFragmentFinishResponse res;
TReportFragmentFinishParams params;
params.__set_query_id(query_id());
params.__set_fragment_instance_id(fragment_instance_id());
// params.query_id = query_id();
// params.fragment_instance_id = fragment_instance_id();
const auto& fe_addr = state->fragment_ctx()->fe_addr();

class RpcRunnable : public Runnable {
public:
RpcRunnable(const TNetworkAddress& fe_addr, const TReportFragmentFinishResponse& res,
const TReportFragmentFinishParams& params)
: fe_addr(fe_addr), res(res), params(params) {}
const TNetworkAddress fe_addr;
TReportFragmentFinishResponse res;
const TReportFragmentFinishParams params;

void run() override {
(void)ThriftRpcHelper::rpc<FrontendServiceClient>(
fe_addr.hostname, fe_addr.port,
[&](FrontendServiceConnection& client) { client->reportFragmentFinish(res, params); });
}
};
//
std::shared_ptr<Runnable> runnable;
runnable = std::make_shared<RpcRunnable>(fe_addr, res, params);
(void)state->exec_env()->streaming_load_thread_pool()->submit(runnable);
}

destroy_pass_through_chunk_buffer();

query_ctx->count_down_fragments();
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class FragmentContext {
bool enable_group_execution() const { return _enable_group_execution; }
void set_enable_group_execution(bool enable_group_execution) { _enable_group_execution = enable_group_execution; }

void set_report_when_finish(bool report) { _report_when_finish = report; }

private:
bool _enable_group_execution = false;
// Id of this query
Expand Down Expand Up @@ -227,6 +229,8 @@ class FragmentContext {

RuntimeProfile::Counter* _jit_counter = nullptr;
RuntimeProfile::Counter* _jit_timer = nullptr;

bool _report_when_finish{};
};

class FragmentContextManager {
Expand Down
18 changes: 17 additions & 1 deletion be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <unordered_map>

#include "common/config.h"
#include "exec/capture_version_node.h"
#include "exec/cross_join_node.h"
#include "exec/exchange_node.h"
#include "exec/exec_node.h"
Expand Down Expand Up @@ -103,6 +104,9 @@ Status FragmentExecutor::_prepare_query_ctx(ExecEnv* exec_env, const UnifiedExec
}

_query_ctx = exec_env->query_context_mgr()->get_or_register(query_id);
if (_query_ctx == nullptr) {
return Status::Cancelled("Query has been cancelled");
}
_query_ctx->set_exec_env(exec_env);
if (params.__isset.instances_number) {
_query_ctx->set_total_fragments(params.instances_number);
Expand Down Expand Up @@ -254,6 +258,8 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
exec_env->runtime_filter_worker()->open_query(query_id, query_options, *runtime_filter_params, true);
}
_fragment_ctx->prepare_pass_through_chunk_buffer();
_fragment_ctx->set_report_when_finish(request.unique().params.__isset.report_when_finish &&
request.unique().params.report_when_finish);

auto* obj_pool = runtime_state->obj_pool();
// Set up desc tbl
Expand Down Expand Up @@ -506,6 +512,15 @@ Status FragmentExecutor::_prepare_exec_plan(ExecEnv* exec_env, const UnifiedExec
}
}

std::vector<ExecNode*> capture_version_nodes;
plan->collect_nodes(TPlanNodeType::CAPTURE_VERSION_NODE, &capture_version_nodes);
for (auto* node : capture_version_nodes) {
const std::vector<TScanRangeParams>& scan_ranges = request.scan_ranges_of_node(node->id());
ASSIGN_OR_RETURN(auto morsel_queue_factory,
down_cast<CaptureVersionNode*>(node)->scan_range_to_morsel_queue_factory(scan_ranges));
morsel_queue_factories.emplace(node->id(), std::move(morsel_queue_factory));
}

if (_wg && _wg->big_query_scan_rows_limit() > 0) {
if (logical_scan_limit >= 0 && logical_scan_limit <= _wg->big_query_scan_rows_limit()) {
_query_ctx->set_scan_limit(std::max(_wg->big_query_scan_rows_limit(), physical_scan_limit));
Expand Down Expand Up @@ -747,7 +762,8 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
LOG(WARNING) << "Prepare fragment failed: " << print_id(request.common().params.query_id)
<< " fragment_instance_id=" << print_id(request.fragment_instance_id())
<< " is_stream_pipeline=" << request.is_stream_pipeline()
<< " backend_num=" << request.backend_num() << " fragment= " << request.common().fragment;
<< " backend_num=" << request.backend_num();
VLOG_QUERY << "Prepare fragment failed fragment=" << request.common().fragment;
}
});

Expand Down
Loading

0 comments on commit cb8113c

Please sign in to comment.