Skip to content

Commit

Permalink
Merge branch 'main' into wcy/row_based_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Aug 2, 2022
2 parents 802744a + 4269183 commit 23bb49b
Show file tree
Hide file tree
Showing 73 changed files with 2,386 additions and 1,650 deletions.
2 changes: 1 addition & 1 deletion e2e_test/batch/catalog/pg_namespace.slt.part
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
query IT rowsort
SELECT nspname FROM pg_catalog.pg_namespace;
----
pg_catalog
public
pg_catalog
13 changes: 9 additions & 4 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,16 @@ message HummockPinnedSnapshot {
uint64 minimal_pinned_snapshot = 2;
}

message GetNewTableIdRequest {}
message GetNewSstIdsRequest {
uint32 number = 1;
}

message GetNewTableIdResponse {
message GetNewSstIdsResponse {
common.Status status = 1;
uint64 table_id = 2;
// inclusive
uint64 start_id = 2;
// exclusive
uint64 end_id = 3;
}

message SubscribeCompactTasksRequest {
Expand Down Expand Up @@ -283,7 +288,7 @@ service HummockManagerService {
rpc GetEpoch(GetEpochRequest) returns (GetEpochResponse);
rpc UnpinSnapshot(UnpinSnapshotRequest) returns (UnpinSnapshotResponse);
rpc UnpinSnapshotBefore(UnpinSnapshotBeforeRequest) returns (UnpinSnapshotBeforeResponse);
rpc GetNewTableId(GetNewTableIdRequest) returns (GetNewTableIdResponse);
rpc GetNewSstIds(GetNewSstIdsRequest) returns (GetNewSstIdsResponse);
rpc SubscribeCompactTasks(SubscribeCompactTasksRequest) returns (stream SubscribeCompactTasksResponse);
rpc ReportVacuumTask(ReportVacuumTaskRequest) returns (ReportVacuumTaskResponse);
rpc GetCompactionGroups(GetCompactionGroupsRequest) returns (GetCompactionGroupsResponse);
Expand Down
6 changes: 1 addition & 5 deletions src/bench/ss_bench/operations/write_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,7 @@ impl Operations {
if let Some((compact_context, local_version_manager)) = context {
if let Some(task) = self.meta_client.get_compact_task().await {
Compactor::compact(compact_context.clone(), task).await;
// FIXME: A workaround to ensure the version after compaction is available
// locally. Notice now multiple tasks are trying to pin_version, which breaks
// the assumption required by LocalVersionManager. It may result in some pinned
// versions never get unpinned. This can be fixed after
// LocalVersionManager::start_workers is modified into push-based.
// Ensure the version after compaction is available locally.
let last_pinned_id = local_version_manager.get_pinned_version().id();
let version = self.meta_client.pin_version(last_pinned_id).await.unwrap();
local_version_manager.try_update_pinned_version(Some(last_pinned_id), version);
Expand Down
8 changes: 8 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ pub struct StorageConfig {
/// Capacity of sstable meta cache.
#[serde(default = "default::compactor_memory_limit_mb")]
pub compactor_memory_limit_mb: usize,

/// Number of SST ids fetched from meta per RPC
#[serde(default = "default::sstable_id_remote_fetch_number")]
pub sstable_id_remote_fetch_number: u32,
}

impl Default for StorageConfig {
Expand Down Expand Up @@ -297,6 +301,10 @@ mod default {
pub fn compactor_memory_limit_mb() -> usize {
512
}

pub fn sstable_id_remote_fetch_number() -> u32 {
10
}
}

pub mod constant {
Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ pub async fn compute_node_serve(
Some(Arc::new(CompactionExecutor::new(Some(1)))),
table_id_to_slice_transform.clone(),
memory_limiter.clone(),
storage.sstable_id_manager(),
);
sub_tasks.push((handle, shutdown_sender));
}
Expand Down
84 changes: 82 additions & 2 deletions src/frontend/src/catalog/pg_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use risingwave_common::array::Row;
use risingwave_common::catalog::{ColumnDesc, SysCatalogReader, TableId, DEFAULT_SUPER_USER_ID};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_common::types::{DataType, ScalarImpl};
use risingwave_pb::user::grant_privilege::{Action, Object};
use risingwave_pb::user::UserInfo;
use serde_json::json;

use crate::catalog::catalog_service::CatalogReader;
Expand All @@ -42,7 +44,9 @@ use crate::catalog::system_catalog::SystemCatalog;
use crate::meta_client::FrontendMetaClient;
use crate::scheduler::worker_node_manager::WorkerNodeManagerRef;
use crate::session::AuthContext;
use crate::user::user_privilege::available_prost_privilege;
use crate::user::user_service::UserInfoReader;
use crate::user::UserId;

#[expect(dead_code)]
pub struct SysCatalogReaderImpl {
Expand Down Expand Up @@ -92,17 +96,93 @@ impl SysCatalogReader for SysCatalogReaderImpl {
}
}

/// get acl items of `object` in string, ignore public.
fn get_acl_items(
object: &Object,
users: &Vec<UserInfo>,
username_map: &HashMap<UserId, String>,
) -> String {
let mut res = String::from("{");
let mut empty_flag = true;
let super_privilege = available_prost_privilege(object.clone());
for user in users {
let privileges = if user.get_is_supper() {
vec![&super_privilege]
} else {
user.get_grant_privileges()
.iter()
.filter(|&privilege| privilege.object.as_ref().unwrap() == object)
.collect_vec()
};
if privileges.is_empty() {
continue;
};
let mut grantor_map = HashMap::new();
privileges.iter().for_each(|&privilege| {
privilege.action_with_opts.iter().for_each(|ao| {
grantor_map.entry(ao.granted_by).or_insert_with(Vec::new);
grantor_map
.get_mut(&ao.granted_by)
.unwrap()
.push((ao.action, ao.with_grant_option));
})
});
for key in grantor_map.keys() {
if empty_flag {
empty_flag = false;
} else {
res.push(',');
}
res.push_str(user.get_name());
res.push('=');
grantor_map
.get(key)
.unwrap()
.iter()
.for_each(|(action, option)| {
let str = match Action::from_i32(*action).unwrap() {
Action::Select => "r",
Action::Insert => "a",
Action::Update => "w",
Action::Delete => "d",
Action::Create => "C",
Action::Connect => "c",
_ => unreachable!(),
};
res.push_str(str);
if *option {
res.push('*');
}
});
res.push('/');
// should be able to query grantor's name
res.push_str(username_map.get(key).as_ref().unwrap());
}
}
res.push('}');
res
}
impl SysCatalogReaderImpl {
fn read_namespace(&self) -> Result<Vec<Row>> {
let reader = self.catalog_reader.read_guard();
let schemas = reader.get_all_schema_info(&self.auth_context.database)?;
let schemas = self
.catalog_reader
.read_guard()
.get_all_schema_info(&self.auth_context.database)?;
let user_reader = self.user_info_reader.read_guard();
let users = user_reader.get_all_users();
let username_map = user_reader.get_user_name_map();
Ok(schemas
.iter()
.map(|schema| {
Row::new(vec![
Some(ScalarImpl::Int32(schema.id as i32)),
Some(ScalarImpl::Utf8(schema.name.clone())),
Some(ScalarImpl::Int32(schema.owner as i32)),
Some(ScalarImpl::Utf8(get_acl_items(
&Object::SchemaId(schema.id),
&users,
username_map,
))),
])
})
.collect_vec())
Expand Down
3 changes: 2 additions & 1 deletion src/frontend/src/catalog/pg_catalog/pg_namespace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ pub const PG_NAMESPACE_TABLE_NAME: &str = "pg_namespace";
pub const PG_NAMESPACE_COLUMNS: &[PgCatalogColumnsDef] = &[
(DataType::Int32, "oid"),
(DataType::Varchar, "nspname"),
(DataType::Int32, "nspowner"), // TODO: support ACL here.
(DataType::Int32, "nspowner"),
(DataType::Varchar, "nspacl"),
];
83 changes: 9 additions & 74 deletions src/frontend/src/handler/handle_privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,73 +14,15 @@

use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::error::{ErrorCode, Result};
use risingwave_pb::user::grant_privilege::{
Action as ProstAction, ActionWithGrantOption, Object as ProstObject,
};
use risingwave_pb::user::grant_privilege::{ActionWithGrantOption, Object as ProstObject};
use risingwave_pb::user::GrantPrivilege as ProstPrivilege;
use risingwave_sqlparser::ast::{Action, GrantObjects, Privileges, Statement};
use risingwave_sqlparser::ast::{GrantObjects, Privileges, Statement};

use crate::binder::Binder;
use crate::session::{OptimizerContext, SessionImpl};

// TODO: add user_privilege mod under user manager and move check and expand logic there, and bitmap
// impl for privilege check.
static AVAILABLE_ACTION_ON_DATABASE: &[Action] = &[Action::Connect, Action::Create];
static AVAILABLE_ACTION_ON_SCHEMA: &[Action] = &[Action::Create];
static AVAILABLE_ACTION_ON_SOURCE: &[Action] = &[
Action::Select { columns: None },
Action::Update { columns: None },
Action::Insert { columns: None },
Action::Delete,
];
static AVAILABLE_ACTION_ON_MVIEW: &[Action] = &[Action::Select { columns: None }];

pub(crate) fn check_privilege_type(privilege: &Privileges, objects: &GrantObjects) -> Result<()> {
match privilege {
Privileges::All { .. } => Ok(()),
Privileges::Actions(actions) => {
let valid = match objects {
GrantObjects::Databases(_) => actions
.iter()
.all(|action| AVAILABLE_ACTION_ON_DATABASE.contains(action)),
GrantObjects::Schemas(_) => actions
.iter()
.all(|action| AVAILABLE_ACTION_ON_SCHEMA.contains(action)),
GrantObjects::Sources(_) | GrantObjects::AllSourcesInSchema { .. } => actions
.iter()
.all(|action| AVAILABLE_ACTION_ON_SOURCE.contains(action)),
GrantObjects::Mviews(_) | GrantObjects::AllMviewsInSchema { .. } => actions
.iter()
.all(|action| AVAILABLE_ACTION_ON_MVIEW.contains(action)),
_ => true,
};
if !valid {
return Err(ErrorCode::BindError(
"Invalid privilege type for the given object.".to_string(),
)
.into());
}

Ok(())
}
}
}

pub(crate) fn available_privilege_actions(objects: &GrantObjects) -> Result<Vec<Action>> {
match objects {
GrantObjects::Databases(_) => Ok(AVAILABLE_ACTION_ON_DATABASE.to_vec()),
GrantObjects::Schemas(_) => Ok(AVAILABLE_ACTION_ON_SCHEMA.to_vec()),
GrantObjects::Sources(_) | GrantObjects::AllSourcesInSchema { .. } => {
Ok(AVAILABLE_ACTION_ON_SOURCE.to_vec())
}
GrantObjects::Mviews(_) | GrantObjects::AllMviewsInSchema { .. } => {
Ok(AVAILABLE_ACTION_ON_MVIEW.to_vec())
}
_ => Err(
ErrorCode::BindError("Invalid privilege type for the given object.".to_string()).into(),
),
}
}
use crate::user::user_privilege::{
available_privilege_actions, check_privilege_type, get_prost_action,
};

fn make_prost_privilege(
session: &SessionImpl,
Expand Down Expand Up @@ -154,15 +96,7 @@ fn make_prost_privilege(
let action_with_opts = actions
.iter()
.map(|action| {
let prost_action = match action {
Action::Select { .. } => ProstAction::Select,
Action::Insert { .. } => ProstAction::Insert,
Action::Update { .. } => ProstAction::Update,
Action::Delete { .. } => ProstAction::Delete,
Action::Connect => ProstAction::Connect,
Action::Create => ProstAction::Create,
_ => unreachable!(),
};
let prost_action = get_prost_action(action);
ActionWithGrantOption {
action: prost_action as i32,
granted_by: session.user_id(),
Expand Down Expand Up @@ -272,6 +206,7 @@ pub async fn handle_revoke_privilege(
#[cfg(test)]
mod tests {
use risingwave_common::catalog::DEFAULT_SUPER_USER_ID;
use risingwave_pb::user::grant_privilege::Action;

use super::*;
use crate::test_utils::LocalFrontend;
Expand Down Expand Up @@ -309,12 +244,12 @@ mod tests {
vec![ProstPrivilege {
action_with_opts: vec![
ActionWithGrantOption {
action: ProstAction::Connect as i32,
action: Action::Connect as i32,
with_grant_option: true,
granted_by: DEFAULT_SUPER_USER_ID,
},
ActionWithGrantOption {
action: ProstAction::Create as i32,
action: Action::Create as i32,
with_grant_option: true,
granted_by: DEFAULT_SUPER_USER_ID,
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

pub(crate) mod user_authentication;
pub(crate) mod user_manager;
pub mod user_privilege;
pub(crate) mod user_service;

pub type UserId = u32;
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/src/user/user_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ impl UserInfoManager {
self.user_name_by_id.get(&id).cloned()
}

pub fn get_user_name_map(&self) -> &HashMap<UserId, String> {
&self.user_name_by_id
}

pub fn create_user(&mut self, user_info: UserInfo) {
let id = user_info.id;
let name = user_info.name.clone();
Expand Down
Loading

0 comments on commit 23bb49b

Please sign in to comment.