Skip to content

Commit

Permalink
[Enhancement] support phased schedule
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jul 4, 2024
1 parent 245cb30 commit 318bbaa
Show file tree
Hide file tree
Showing 28 changed files with 1,062 additions and 48 deletions.
36 changes: 36 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@

#include "exec/pipeline/fragment_context.h"

#include <memory>

#include "exec/data_sink.h"
#include "exec/pipeline/group_execution/execution_group.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/stream_pipeline_driver.h"
#include "exec/workgroup/work_group.h"
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/transaction_mgr.h"
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"

namespace starrocks::pipeline {
Expand Down Expand Up @@ -85,6 +90,37 @@ void FragmentContext::count_down_execution_group(size_t val) {
auto status = final_status();
state->exec_env()->wg_driver_executor()->report_exec_state(query_ctx, this, status, true, true);

if (_report_when_finish) {
/// TODO: report fragment finish to BE coordinator
TReportFragmentFinishResponse res;
TReportFragmentFinishParams params;
params.__set_query_id(query_id());
params.__set_fragment_instance_id(fragment_instance_id());
// params.query_id = query_id();
// params.fragment_instance_id = fragment_instance_id();
const auto& fe_addr = state->fragment_ctx()->fe_addr();

class RpcRunnable : public Runnable {
public:
RpcRunnable(const TNetworkAddress& fe_addr, const TReportFragmentFinishResponse& res,
const TReportFragmentFinishParams& params)
: fe_addr(fe_addr), res(res), params(params) {}
const TNetworkAddress fe_addr;
TReportFragmentFinishResponse res;
const TReportFragmentFinishParams params;

void run() override {
(void)ThriftRpcHelper::rpc<FrontendServiceClient>(
fe_addr.hostname, fe_addr.port,
[&](FrontendServiceConnection& client) { client->reportFragmentFinish(res, params); });
}
};
//
std::shared_ptr<Runnable> runnable;
runnable = std::make_shared<RpcRunnable>(fe_addr, res, params);
(void)state->exec_env()->streaming_load_thread_pool()->submit(runnable);
}

destroy_pass_through_chunk_buffer();

query_ctx->count_down_fragments();
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/pipeline/fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ class FragmentContext {
bool enable_group_execution() const { return _enable_group_execution; }
void set_enable_group_execution(bool enable_group_execution) { _enable_group_execution = enable_group_execution; }

void set_report_when_finish(bool report) { _report_when_finish = report; }

private:
bool _enable_group_execution = false;
// Id of this query
Expand Down Expand Up @@ -227,6 +229,8 @@ class FragmentContext {

RuntimeProfile::Counter* _jit_counter = nullptr;
RuntimeProfile::Counter* _jit_timer = nullptr;

bool _report_when_finish{};
};

class FragmentContextManager {
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/fragment_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,8 @@ Status FragmentExecutor::_prepare_runtime_state(ExecEnv* exec_env, const Unified
exec_env->runtime_filter_worker()->open_query(query_id, query_options, *runtime_filter_params, true);
}
_fragment_ctx->prepare_pass_through_chunk_buffer();
_fragment_ctx->set_report_when_finish(request.unique().params.__isset.report_when_finish &&
request.unique().params.report_when_finish);

auto* obj_pool = runtime_state->obj_pool();
// Set up desc tbl
Expand Down
18 changes: 9 additions & 9 deletions be/src/exec/pipeline/scan/olap_scan_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,12 @@ Status OlapScanContext::prepare(RuntimeState* state) {

void OlapScanContext::close(RuntimeState* state) {
_chunk_buffer.close();
for (const auto& rowsets_per_tablet : _tablet_rowsets) {
Rowset::release_readers(rowsets_per_tablet);
}
_rowset_release_guard.reset();
}

Status OlapScanContext::capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges) {
_tablet_rowsets.resize(olap_scan_ranges.size());
std::vector<std::vector<RowsetSharedPtr>> tablet_rowsets;
tablet_rowsets.resize(olap_scan_ranges.size());
_tablets.resize(olap_scan_ranges.size());
for (int i = 0; i < olap_scan_ranges.size(); ++i) {
auto* scan_range = olap_scan_ranges[i];
Expand All @@ -91,21 +90,22 @@ Status OlapScanContext::capture_tablet_rowsets(const std::vector<TInternalScanRa

if (scan_range->__isset.gtid) {
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(scan_range->gtid, &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(scan_range->gtid, &tablet_rowsets[i]));
Rowset::acquire_readers(tablet_rowsets[i]);
} else {
int64_t version = strtoul(scan_range->version.c_str(), nullptr, 10);
// Capture row sets of this version tablet.
std::shared_lock l(tablet->get_header_lock());
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &_tablet_rowsets[i]));
Rowset::acquire_readers(_tablet_rowsets[i]);
RETURN_IF_ERROR(tablet->capture_consistent_rowsets(Version(0, version), &tablet_rowsets[i]));
Rowset::acquire_readers(tablet_rowsets[i]);
}

VLOG(1) << "capture tablet rowsets: " << tablet->full_name() << ", rowsets: " << _tablet_rowsets[i].size()
VLOG(1) << "capture tablet rowsets: " << tablet->full_name() << ", rowsets: " << tablet_rowsets[i].size()
<< ", version: " << scan_range->version << ", gtid: " << scan_range->gtid;

_tablets[i] = std::move(tablet);
}
_rowset_release_guard = MultiRowsetReleaseGuard(std::move(tablet_rowsets), adopt_acquire_t{});

return Status::OK();
}
Expand Down
7 changes: 5 additions & 2 deletions be/src/exec/pipeline/scan/olap_scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "exec/pipeline/context_with_dependency.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "runtime/global_dict/parser.h"
#include "storage/rowset/rowset.h"
#include "util/phmap/phmap_fwd_decl.h"

namespace starrocks {
Expand Down Expand Up @@ -115,7 +116,9 @@ class OlapScanContext final : public ContextWithDependency {

Status capture_tablet_rowsets(const std::vector<TInternalScanRange*>& olap_scan_ranges);
const std::vector<TabletSharedPtr>& tablets() const { return _tablets; }
const std::vector<std::vector<RowsetSharedPtr>>& tablet_rowsets() const { return _tablet_rowsets; };
const std::vector<std::vector<RowsetSharedPtr>>& tablet_rowsets() const {
return _rowset_release_guard.tablet_rowsets();
};

const std::vector<ColumnAccessPathPtr>* column_access_paths() const;

Expand Down Expand Up @@ -148,7 +151,7 @@ class OlapScanContext final : public ContextWithDependency {
// of the left table are compacted at building the right hash table. Therefore, reference
// the row sets into _tablet_rowsets in the preparation phase to avoid the row sets being deleted.
std::vector<TabletSharedPtr> _tablets;
std::vector<std::vector<RowsetSharedPtr>> _tablet_rowsets;
MultiRowsetReleaseGuard _rowset_release_guard;
ConcurrentJitRewriter& _jit_rewriter;
};

Expand Down
3 changes: 3 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class PInternalServiceImplBase : public T {
const PProcessDictionaryCacheRequest* request,
PProcessDictionaryCacheResult* response, google::protobuf::Closure* done) override;

void report_fragment_finished(google::protobuf::RpcController* controller, const PFragmentFinishedRequest* req,
PFragmentFinishedResponse* res, google::protobuf::Closure* done) override {}

private:
void _transmit_chunk(::google::protobuf::RpcController* controller,
const ::starrocks::PTransmitChunkParams* request, ::starrocks::PTransmitChunkResult* response,
Expand Down
50 changes: 46 additions & 4 deletions be/src/storage/rowset/rowset.h
Original file line number Diff line number Diff line change
Expand Up @@ -417,14 +417,56 @@ class Rowset : public std::enable_shared_from_this<Rowset>, public BaseRowset {
KeysType _keys_type;
};

class RowsetReleaseGuard {
struct adopt_acquire_t {
explicit adopt_acquire_t() = default;
};

template <class T>
class TReleaseGuard {
public:
explicit TReleaseGuard(T rowset) : _rowset(std::move(rowset)) { _rowset->acquire(); }
explicit TReleaseGuard(T rowset, adopt_acquire_t) : _rowset(std::move(rowset)) {}
~TReleaseGuard() {
if (_rowset) {
_rowset->release();
}
}

private:
T _rowset;
};

template <class T>
class TReleaseGuard<std::vector<std::vector<T>>> {
public:
explicit RowsetReleaseGuard(std::shared_ptr<Rowset> rowset) : _rowset(std::move(rowset)) { _rowset->acquire(); }
~RowsetReleaseGuard() { _rowset->release(); }
TReleaseGuard() = default;
explicit TReleaseGuard(std::vector<std::vector<T>>&& rowsets, adopt_acquire_t)
: _tablet_rowsets(std::move(rowsets)) {}

TReleaseGuard& operator=(TReleaseGuard&& other) noexcept {
std::swap(_tablet_rowsets, other._tablet_rowsets);
return *this;
}
~TReleaseGuard() { reset(); }

void reset() {
for (auto& rowset_list : _tablet_rowsets) {
Rowset::release_readers(rowset_list);
}
_tablet_rowsets.clear();
}
const std::vector<std::vector<T>>& tablet_rowsets() const { return _tablet_rowsets; }

TReleaseGuard(TReleaseGuard&& other) = delete;
TReleaseGuard(const TReleaseGuard& other) = delete;
TReleaseGuard& operator=(const TReleaseGuard& other) = delete;

private:
std::shared_ptr<Rowset> _rowset;
std::vector<std::vector<T>> _tablet_rowsets;
};
using RowsetReleaseGuard = TReleaseGuard<RowsetSharedPtr>;
using MultiRowsetReleaseGuard = TReleaseGuard<std::vector<std::vector<RowsetSharedPtr>>>;

using TabletSchemaSPtr = std::shared_ptr<TabletSchema>;

} // namespace starrocks
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ private static String printCounter(long value, TUnit type) {
return builder.toString();
}

// concurrency safe
public void addChild(RuntimeProfile child) {
if (child == null) {
return;
Expand All @@ -448,6 +449,19 @@ public void addChild(RuntimeProfile child) {
childList.add(pair);
}

// concurrency safe
public void addChildren(List<RuntimeProfile> children) {
if (children.isEmpty()) {
return;
}
final RuntimeProfile child = children.get(0);
childMap.put(child.name, child);
List<Pair<RuntimeProfile, Boolean>> childList =
children.stream().map(c -> new Pair<>(c, true)).collect(Collectors.toList());
childList.addAll(childList);
this.childList.addAll(childList);
}

public void removeChild(String childName) {
RuntimeProfile childProfile = childMap.remove(childName);
if (childProfile == null) {
Expand Down
49 changes: 39 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,15 @@
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.qe.scheduler.Deployer;
import com.starrocks.qe.scheduler.QueryRuntimeProfile;
import com.starrocks.qe.scheduler.dag.CriticalAreaRunner;
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
import com.starrocks.qe.scheduler.dag.ExecutionSchedule;
import com.starrocks.qe.scheduler.dag.FragmentInstance;
import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.qe.scheduler.dag.PhasedExecutionSchedule;
import com.starrocks.qe.scheduler.dag.TiredExecutionSchedule;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -154,6 +158,8 @@ public class DefaultCoordinator extends Coordinator {
private boolean isShortCircuit = false;
private boolean isBinaryRow = false;

private ExecutionSchedule schedule;

public static class Factory implements Coordinator.Factory {

@Override
Expand Down Expand Up @@ -219,7 +225,8 @@ public DefaultCoordinator createBrokerExportScheduler(Long jobId, TUniqueId quer

@Override
public DefaultCoordinator createRefreshDictionaryCacheScheduler(ConnectContext context, TUniqueId queryId,
DescriptorTable descTable, List<PlanFragment> fragments,
DescriptorTable descTable,
List<PlanFragment> fragments,
List<ScanNode> scanNodes) {

JobSpec jobSpec = JobSpec.Factory.fromRefreshDictionaryCacheSpec(context, queryId, descTable, fragments,
Expand Down Expand Up @@ -284,15 +291,22 @@ public DefaultCoordinator(JobSpec jobSpec, StreamLoadPlanner planner, TNetworkAd
}

shortCircuitExecutor =
ShortCircuitExecutor.create(context, fragments, scanNodes, descTable, isBinaryRow, jobSpec.isNeedReport(),
ShortCircuitExecutor.create(context, fragments, scanNodes, descTable, isBinaryRow,
jobSpec.isNeedReport(),
jobSpec.getPlanProtocol(), coordinatorPreprocessor.getWorkerProvider());

if (null != shortCircuitExecutor) {
isShortCircuit = true;
}
if (context.getSessionVariable().enablePhasedScheduler()) {
schedule = new PhasedExecutionSchedule(connectContext);
} else {
schedule = new TiredExecutionSchedule();
}

this.queryProfile = new QueryRuntimeProfile(connectContext, jobSpec, executionDAG.getFragmentsInCreatedOrder().size(),
isShortCircuit);
this.queryProfile =
new QueryRuntimeProfile(connectContext, jobSpec, executionDAG.getFragmentsInCreatedOrder().size(),
isShortCircuit);
}

@Override
Expand Down Expand Up @@ -521,6 +535,22 @@ public void startScheduling(boolean needDeploy) throws Exception {
}
}

@Override
public void scheduleNextTurn(TUniqueId fragmentInstanceId) throws Exception {
schedule.tryScheduleNextTurn(getCriticalAreaRunner(), fragmentInstanceId);
}

public CriticalAreaRunner getCriticalAreaRunner() {
return r -> {
try {
lock();
r.run();
} finally {
unlock();
}
};
}

@Override
public String getSchedulerExplain() {
return executionDAG.getFragmentsInPreorder().stream()
Expand All @@ -541,7 +571,8 @@ private void prepareProfile() {
jobSpec.getQueryOptions().setEnable_profile(true);
}
if (jobSpec.isBrokerLoad() && jobSpec.getQueryOptions().getBig_query_profile_threshold() == 0) {
jobSpec.getQueryOptions().setBig_query_profile_threshold(Config.default_big_load_profile_threshold_second * 1000);
jobSpec.getQueryOptions()
.setBig_query_profile_threshold(Config.default_big_load_profile_threshold_second * 1000);
}
// runtime load profile does not need to report too frequently
if (jobSpec.getQueryOptions().getRuntime_profile_report_interval() < 30) {
Expand Down Expand Up @@ -605,11 +636,9 @@ private void deliverExecFragments(boolean needDeploy) throws RpcException, UserE
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployLockInternalTime")) {
Deployer deployer =
new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(),
this::handleErrorExecution);
for (List<ExecutionFragment> concurrentFragments : executionDAG.getFragmentsInTopologicalOrderFromRoot()) {
deployer.deployFragments(concurrentFragments, needDeploy);
}

this::handleErrorExecution, needDeploy);
schedule.prepareSchedule(deployer, executionDAG);
schedule.schedule();
queryProfile.attachExecutionProfiles(executionDAG.getExecutions());
} finally {
unlock();
Expand Down
3 changes: 3 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/QeProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.starrocks.thrift.TReportAuditStatisticsResult;
import com.starrocks.thrift.TReportExecStatusParams;
import com.starrocks.thrift.TReportExecStatusResult;
import com.starrocks.thrift.TReportFragmentFinishParams;
import com.starrocks.thrift.TReportFragmentFinishResponse;
import com.starrocks.thrift.TUniqueId;

import java.util.List;
Expand All @@ -38,6 +40,7 @@ public interface QeProcessor {
TReportAuditStatisticsResult reportAuditStatistics(TReportAuditStatisticsParams params, TNetworkAddress beAddr);

TBatchReportExecStatusResult batchReportExecStatus(TBatchReportExecStatusParams params, TNetworkAddress beAddr);
TReportFragmentFinishResponse reportFragmentFinish(TReportFragmentFinishParams params);

void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException;

Expand Down
Loading

0 comments on commit 318bbaa

Please sign in to comment.