Skip to content

Commit

Permalink
[Feature] sys.object_dependencies (#35060)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <[email protected]>
(cherry picked from commit 9756c20)
Signed-off-by: Murphy <[email protected]>
  • Loading branch information
murphyatwork committed Dec 14, 2023
1 parent 9992179 commit 4b1ade9
Show file tree
Hide file tree
Showing 15 changed files with 455 additions and 2 deletions.
6 changes: 6 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ set(EXEC_FILES
schema_scanner/starrocks_role_edges_scanner.cpp
schema_scanner/starrocks_grants_to_scanner.cpp
schema_scanner/schema_helper.cpp
<<<<<<< HEAD
=======
schema_scanner/schema_routine_load_jobs_scanner.cpp
schema_scanner/schema_stream_loads_scanner.cpp
schema_scanner/sys_object_dependencies.cpp
>>>>>>> 9756c20447 ([Feature] sys.object_dependencies (#35060))
jdbc_scanner.cpp
sorting/compare_column.cpp
sorting/merge_column.cpp
Expand Down
40 changes: 40 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@
#include "exec/schema_scanner/schema_views_scanner.h"
#include "exec/schema_scanner/starrocks_grants_to_scanner.h"
#include "exec/schema_scanner/starrocks_role_edges_scanner.h"
<<<<<<< HEAD

=======
#include "exec/schema_scanner/sys_object_dependencies.h"
#include "gen_cpp/Descriptors_types.h"
#include "gen_cpp/FrontendService_types.h"
>>>>>>> 9756c20447 ([Feature] sys.object_dependencies (#35060))
namespace starrocks {

StarRocksServer* SchemaScanner::_s_starrocks_server;
Expand Down Expand Up @@ -161,6 +167,19 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return std::make_unique<StarrocksGrantsToScanner>(TGrantsToType::ROLE);
case TSchemaTableType::STARROCKS_GRANT_TO_USERS:
return std::make_unique<StarrocksGrantsToScanner>(TGrantsToType::USER);
<<<<<<< HEAD
=======
case TSchemaTableType::STARROCKS_OBJECT_DEPENDENCIES:
return std::make_unique<SysObjectDependencies>();
case TSchemaTableType::SCH_ROUTINE_LOAD_JOBS:
return std::make_unique<SchemaRoutineLoadJobsScanner>();
case TSchemaTableType::SCH_STREAM_LOADS:
return std::make_unique<SchemaStreamLoadsScanner>();
case TSchemaTableType::SCH_PIPE_FILES:
return std::make_unique<SchemaTablePipeFiles>();
case TSchemaTableType::SCH_PIPES:
return std::make_unique<SchemaTablePipes>();
>>>>>>> 9756c20447 ([Feature] sys.object_dependencies (#35060))
default:
return std::make_unique<SchemaDummyScanner>();
}
Expand Down Expand Up @@ -221,4 +240,25 @@ Status SchemaScanner::_create_slot_descs(ObjectPool* pool) {
return Status::OK();
}

TAuthInfo SchemaScanner::build_auth_info() {
TAuthInfo auth_info;
if (nullptr != _param->catalog) {
auth_info.__set_catalog_name(*(_param->catalog));
}
if (nullptr != _param->db) {
auth_info.__set_pattern(*(_param->db));
}
if (nullptr != _param->current_user_ident) {
auth_info.__set_current_user_ident(*(_param->current_user_ident));
} else {
if (nullptr != _param->user) {
auth_info.__set_user(*(_param->user));
}
if (nullptr != _param->user_ip) {
auth_info.__set_user_ip(*(_param->user_ip));
}
}
return auth_info;
}

} // namespace starrocks
2 changes: 2 additions & 0 deletions be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class SchemaScanner {
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);

TAuthInfo build_auth_info();

static void set_starrocks_server(StarRocksServer* starrocks_server) { _s_starrocks_server = starrocks_server; }

const std::vector<SlotDescriptor*>& get_slot_descs() { return _slot_descs; }
Expand Down
21 changes: 21 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@ Status SchemaHelper::list_materialized_view_status(const std::string& ip, const
});
}

<<<<<<< HEAD
=======
Status SchemaHelper::list_pipes(const std::string& ip, int32_t port, const TListPipesParams& req,
TListPipesResult* res) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
ip, port, [&req, &res](FrontendServiceConnection& client) { client->listPipes(*res, req); });
}

Status SchemaHelper::list_pipe_files(const std::string& ip, int32_t port, const TListPipeFilesParams& req,
TListPipeFilesResult* res) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
ip, port, [&req, &res](FrontendServiceConnection& client) { client->listPipeFiles(*res, req); });
}

Status SchemaHelper::list_object_dependencies(const std::string& ip, int32_t port, const TObjectDependencyReq& req,
TObjectDependencyRes* res) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
ip, port, [&req, &res](FrontendServiceConnection& client) { client->listObjectDependencies(*res, req); });
}

>>>>>>> 9756c20447 ([Feature] sys.object_dependencies (#35060))
Status SchemaHelper::get_tables_info(const std::string& ip, const int32_t port, const TGetTablesInfoRequest& request,
TGetTablesInfoResponse* response) {
return ThriftRpcHelper::rpc<FrontendServiceClient>(
Expand Down
10 changes: 10 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class SchemaHelper {
const TGetTablesParams& request,
TListMaterializedViewStatusResult* result);

<<<<<<< HEAD
=======
static Status list_pipes(const std::string& ip, int32_t port, const TListPipesParams& req, TListPipesResult* res);
static Status list_pipe_files(const std::string& ip, int32_t port, const TListPipeFilesParams& req,
TListPipeFilesResult* res);

static Status list_object_dependencies(const std::string& ip, int32_t port, const TObjectDependencyReq& req,
TObjectDependencyRes* res);

>>>>>>> 9756c20447 ([Feature] sys.object_dependencies (#35060))
static Status get_tables_info(const std::string& ip, const int32_t port, const TGetTablesInfoRequest& request,
TGetTablesInfoResponse* response);

Expand Down
86 changes: 86 additions & 0 deletions be/src/exec/schema_scanner/sys_object_dependencies.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/sys_object_dependencies.h"

#include "exec/schema_scanner/schema_helper.h"
#include "exec/schema_scanner/starrocks_grants_to_scanner.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SysObjectDependencies::_s_columns[] = {
{"OBJECT_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"OBJECT_NAME", TYPE_VARCHAR, sizeof(StringValue), false},
{"OBJECT_DATABASE", TYPE_VARCHAR, sizeof(StringValue), false},
{"OBJECT_CATALOG", TYPE_VARCHAR, sizeof(StringValue), false},
{"OBJECT_TYPE", TYPE_VARCHAR, sizeof(StringValue), false},

{"REF_OBJECT_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"REF_OBJECT_NAME", TYPE_VARCHAR, sizeof(StringValue), false},
{"REF_OBJECT_DATABASE", TYPE_VARCHAR, sizeof(StringValue), false},
{"REF_OBJECT_CATALOG", TYPE_VARCHAR, sizeof(StringValue), false},
{"REF_OBJECT_TYPE", TYPE_VARCHAR, sizeof(StringValue), false},
};

SysObjectDependencies::SysObjectDependencies()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

SysObjectDependencies::~SysObjectDependencies() = default;

Status SysObjectDependencies::start(RuntimeState* state) {
RETURN_IF(!_is_init, Status::InternalError("used before initialized."));
RETURN_IF(!_param->ip || !_param->port, Status::InternalError("IP or port not exists"));

TAuthInfo auth = build_auth_info();
TObjectDependencyReq request;
request.__set_auth_info(auth);

return (SchemaHelper::list_object_dependencies(*(_param->ip), _param->port, request, &_result));
}

Status SysObjectDependencies::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
const TObjectDependencyItem& info = _result.items[_index];
DatumArray datum_array{
(info.object_id), Slice(info.object_name), Slice(info.database),
Slice(info.catalog), Slice(info.object_type),

(info.ref_object_id), Slice(info.ref_object_name), Slice(info.ref_database),
Slice(info.ref_catalog), Slice(info.ref_object_type),
};
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
_index++;
return {};
}

Status SysObjectDependencies::get_next(ChunkPtr* chunk, bool* eos) {
RETURN_IF(!_is_init, Status::InternalError("Used before initialized."));
RETURN_IF((nullptr == chunk || nullptr == eos), Status::InternalError("input pointer is nullptr."));

if (_index >= _result.items.size()) {
*eos = true;
return Status::OK();
}
*eos = false;
return _fill_chunk(chunk);
}

} // namespace starrocks
37 changes: 37 additions & 0 deletions be/src/exec/schema_scanner/sys_object_dependencies.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"

namespace starrocks {

class SysObjectDependencies : public SchemaScanner {
public:
SysObjectDependencies();
~SysObjectDependencies() override;
Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);

size_t _index = 0;
TObjectDependencyRes _result;
static SchemaScanner::ColumnDesc _s_columns[];
};

} // namespace starrocks
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,6 @@ public class SystemId {
public static final long ROLE_EDGES_ID = 101L;
public static final long GRANTS_TO_ROLES_ID = 102L;
public static final long GRANTS_TO_USERS_ID = 103L;
public static final long OBJECT_DEPENDENCIES = 104L;

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public SysDb() {
super.registerTableUnlocked(RoleEdges.create());
super.registerTableUnlocked(GrantsTo.createGrantsToRoles());
super.registerTableUnlocked(GrantsTo.createGrantsToUsers());
super.registerTableUnlocked(SysObjectDependencies.create());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.catalog.system.sys;

import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.InternalCatalog;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.privilege.AccessDeniedException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.Authorizer;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.thrift.TAuthInfo;
import com.starrocks.thrift.TObjectDependencyItem;
import com.starrocks.thrift.TObjectDependencyReq;
import com.starrocks.thrift.TObjectDependencyRes;
import com.starrocks.thrift.TSchemaTableType;
import org.apache.commons.collections4.CollectionUtils;

import java.util.Collection;
import java.util.Optional;

public class SysObjectDependencies {

public static final String NAME = "object_dependencies";

public static SystemTable create() {
return new SystemTable(SystemId.OBJECT_DEPENDENCIES, NAME, Table.TableType.SCHEMA,
SystemTable.builder()
.column("object_id", ScalarType.BIGINT)
.column("object_name", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("object_database", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("object_catalog", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("object_type", ScalarType.createVarcharType(64))

.column("ref_object_id", ScalarType.BIGINT)
.column("ref_object_name", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("ref_object_database", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("ref_object_catalog", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("ref_object_type", ScalarType.createVarcharType(64))
.build(),
TSchemaTableType.STARROCKS_OBJECT_DEPENDENCIES);
}

public static TObjectDependencyRes listObjectDependencies(TObjectDependencyReq req) {
TAuthInfo auth = req.getAuth_info();
TObjectDependencyRes response = new TObjectDependencyRes();

UserIdentity currentUser;
if (auth.isSetCurrent_user_ident()) {
currentUser = UserIdentity.fromThrift(auth.getCurrent_user_ident());
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(auth.getUser(), auth.getUser_ip());
}

// list dependencies of mv
Collection<Database> dbs = GlobalStateMgr.getCurrentState().getFullNameToDb().values();
for (Database db : CollectionUtils.emptyIfNull(dbs)) {
String catalog = Optional.ofNullable(db.getCatalogName())
.orElse(InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME);
for (Table table : db.getTables()) {
// Only show tables with privilege
try {
Authorizer.checkAnyActionOnTableLikeObject(currentUser, null, db.getFullName(), table);
} catch (AccessDeniedException e) {
continue;
}

if (table.isMaterializedView()) {
MaterializedView mv = (MaterializedView) table;
for (BaseTableInfo refObj : CollectionUtils.emptyIfNull(mv.getBaseTableInfos())) {
TObjectDependencyItem item = new TObjectDependencyItem();
item.setObject_id(mv.getId());
item.setObject_name(mv.getName());
item.setDatabase(db.getFullName());
item.setCatalog(catalog);
item.setObject_type(mv.getType().toString());

item.setRef_object_id(refObj.getTableId());
item.setRef_object_name(refObj.getTableName());
item.setRef_database(refObj.getDbName());
item.setRef_catalog(refObj.getCatalogName());
item.setRef_object_type(Optional.ofNullable(refObj.getTable())
.map(x -> x.getType().toString())
.orElse("UNKNOWN"));

response.addToItems(item);
}
}
}
}

return response;
}

}
Loading

0 comments on commit 4b1ade9

Please sign in to comment.