Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine the context functions #3753

Merged
merged 1 commit into from
Jan 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions query/src/api/rpc/flight_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl DatabendQueryFlightDispatcher {
#[tracing::instrument(level = "debug", skip_all, fields(session.id = session.get_id().as_str()))]
async fn one_sink_action(&self, session: SessionRef, action: &FlightAction) -> Result<()> {
let query_context = session.create_context().await?;
let action_context = QueryContext::new(query_context.clone());
let action_context = QueryContext::create_from(query_context.clone());
let pipeline_builder = PipelineBuilder::create(action_context.clone());

let query_plan = action.get_plan();
Expand Down Expand Up @@ -182,7 +182,7 @@ impl DatabendQueryFlightDispatcher {
T: FlightScatter + Send + 'static,
{
let query_context = session.create_context().await?;
let action_context = QueryContext::new(query_context.clone());
let action_context = QueryContext::create_from(query_context.clone());
let pipeline_builder = PipelineBuilder::create(action_context.clone());

let query_plan = action.get_plan();
Expand Down
2 changes: 1 addition & 1 deletion query/src/api/rpc/flight_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl FlightService for DatabendQueryFlightService {
FlightAction::CancelAction(action) => {
// We only destroy when session is exist
let session_id = action.query_id.clone();
if let Some(session) = self.sessions.get_session(&session_id) {
if let Some(session) = self.sessions.get_session_by_id(&session_id) {
// TODO: remove streams
session.force_kill_session();
}
Expand Down
1 change: 0 additions & 1 deletion query/src/interpreters/interpreter_describe_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ impl Interpreter for DescribeStageInterpreter {
let default_stage = UserStageInfo::default();
let stage = self
.ctx
.get_sessions_manager()
.get_user_manager()
.get_stage(self.plan.name.as_str())
.await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_grant_privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Interpreter for GrantPrivilegeInterpreter {
// TODO: check user existence
// TODO: check privilege on granting on the grant object

let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr
.grant_user_privileges(&plan.name, &plan.hostname, plan.on, plan.priv_types)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl Interpreter for KillInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let id = &self.plan.id;
match self.ctx.get_sessions_manager().get_session(id) {
match self.ctx.get_session_by_id(id) {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
Expand Down
8 changes: 4 additions & 4 deletions query/src/interpreters/interpreter_query_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ impl InterpreterQueryLog {

pub async fn log_start(&self) -> Result<()> {
// User.
let handler_type = self.ctx.get_session().get_type();
let handler_type = self.ctx.get_current_session().get_type();
let tenant_id = self.ctx.get_config().query.tenant_id;
let cluster_id = self.ctx.get_config().query.cluster_id;
let user = self.ctx.get_current_user()?;
Expand Down Expand Up @@ -191,7 +191,7 @@ impl InterpreterQueryLog {
let result_rows = 0u64;
let result_bytes = 0u64;
let cpu_usage = self.ctx.get_settings().get_max_threads()? as u32;
let memory_usage = self.ctx.get_session().get_memory_usage() as u64;
let memory_usage = self.ctx.get_current_session().get_memory_usage() as u64;

// Client.
let client_address = format!("{:?}", self.ctx.get_client_address());
Expand Down Expand Up @@ -240,7 +240,7 @@ impl InterpreterQueryLog {

pub async fn log_finish(&self) -> Result<()> {
// User.
let handler_type = self.ctx.get_session().get_type();
let handler_type = self.ctx.get_current_session().get_type();
let tenant_id = self.ctx.get_config().query.tenant_id;
let cluster_id = self.ctx.get_config().query.cluster_id;
let user = self.ctx.get_current_user()?;
Expand Down Expand Up @@ -269,7 +269,7 @@ impl InterpreterQueryLog {
let scan_seeks = dal_metrics.read_seeks as u64;
let scan_seek_cost_ms = dal_metrics.read_seek_cost_ms as u64;
let cpu_usage = self.ctx.get_settings().get_max_threads()? as u32;
let memory_usage = self.ctx.get_session().get_memory_usage() as u64;
let memory_usage = self.ctx.get_current_session().get_memory_usage() as u64;

// Result.
let result_rows = self.ctx.get_result_progress_value().read_rows as u64;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_revoke_privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl Interpreter for RevokePrivilegeInterpreter {
// TODO: check user existence
// TODO: check privilege on granting on the grant object

let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr
.revoke_user_privileges(&plan.username, &plan.hostname, plan.on, plan.priv_types)
.await?;
Expand Down
1 change: 0 additions & 1 deletion query/src/interpreters/interpreter_show_grants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl Interpreter for ShowGrantsInterpreter {
None => self.ctx.get_current_user()?,
Some(ref user_identity) => {
self.ctx
.get_sessions_manager()
.get_user_manager()
.get_user(&user_identity.username, &user_identity.hostname)
.await?
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_stage_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for CreatStageInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
let user_stage = plan.user_stage_info;
let create_stage = user_mgr.add_stage(user_stage).await;
if plan.if_not_exists {
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_stage_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for DropStageInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr
.drop_stage(plan.name.as_str(), plan.if_exists)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_udf_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for AlterUDFInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr.update_udf(plan.udf).await?;

Ok(Box::pin(DataBlockStream::create(
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_udf_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for CreatUDFInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
let udf = plan.udf;
let create_udf = user_mgr.add_udf(udf).await;
if plan.if_not_exists {
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_udf_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for DropUDFInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr
.drop_udf(plan.name.as_str(), plan.if_exists)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_udf_show.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for ShowUDFInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
let udf = user_mgr.get_udf(&plan.name).await?;

let show_fields = vec![
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_user_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for AlterUserInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
//TODO:alter current user
user_mgr
.update_user(
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_user_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Interpreter for CreateUserInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
let user_info = UserInfo {
name: plan.name,
hostname: plan.hostname,
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/interpreter_user_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl Interpreter for DropUserInterpreter {
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
let plan = self.plan.clone();
let user_mgr = self.ctx.get_sessions_manager().get_user_manager();
let user_mgr = self.ctx.get_user_manager();
user_mgr
.drop_user(plan.name.as_str(), plan.hostname.as_str(), plan.if_exists)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/interpreters/plan_schedulers/plan_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ impl PlanScheduler {
}

fn visit_subquery(&mut self, plan: &PlanNode, tasks: &mut Tasks) -> Result<Vec<PlanNode>> {
let subquery_context = QueryContext::new(self.query_context.clone());
let subquery_context = QueryContext::create_from(self.query_context.clone());
let mut subquery_scheduler = PlanScheduler::try_create(subquery_context)?;
subquery_scheduler.visit_plan_node(plan, tasks)?;
Ok(subquery_scheduler.nodes_plan)
Expand Down
2 changes: 1 addition & 1 deletion query/src/optimizers/optimizer_scatters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl ScattersOptimizerImpl {

impl PlanRewriter for ScattersOptimizerImpl {
fn rewrite_subquery_plan(&mut self, subquery_plan: &PlanNode) -> Result<PlanNode> {
let subquery_ctx = QueryContext::new(self.ctx.clone());
let subquery_ctx = QueryContext::create_from(self.ctx.clone());
let mut subquery_optimizer = ScattersOptimizerImpl::create(subquery_ctx);
let rewritten_subquery = subquery_optimizer.rewrite_plan_node(subquery_plan)?;

Expand Down
2 changes: 1 addition & 1 deletion query/src/pipelines/transforms/transform_create_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<'a> SubQueriesPuller<'a> {

fn init(&mut self) -> Result<()> {
for query_expression in &self.expressions {
let subquery_ctx = QueryContext::new(self.ctx.clone());
let subquery_ctx = QueryContext::create_from(self.ctx.clone());

match query_expression {
Expression::Subquery { query_plan, .. } => {
Expand Down
90 changes: 53 additions & 37 deletions query/src/sessions/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;
use common_base::tokio::task::JoinHandle;
use common_base::Progress;
use common_base::ProgressValues;
use common_base::Runtime;
use common_base::TrySpawn;
use common_cache::storage::StorageCache;
use common_dal::AzureBlobAccessor;
Expand Down Expand Up @@ -52,11 +51,13 @@ use crate::clusters::Cluster;
use crate::configs::AzureStorageBlobConfig;
use crate::configs::Config;
use crate::servers::http::v1::HttpQueryHandle;
use crate::sessions::ProcessInfo;
use crate::sessions::QueryContextShared;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionRef;
use crate::sessions::Settings;
use crate::storages::Table;
use crate::users::UserApiProvider;

pub struct QueryContext {
version: String,
Expand All @@ -66,14 +67,14 @@ pub struct QueryContext {
}

impl QueryContext {
pub fn new(other: Arc<QueryContext>) -> Arc<QueryContext> {
QueryContext::from_shared(other.shared.clone())
pub fn create_from(other: Arc<QueryContext>) -> Arc<QueryContext> {
QueryContext::create_from_shared(other.shared.clone())
}

pub fn from_shared(shared: Arc<QueryContextShared>) -> Arc<QueryContext> {
pub fn create_from_shared(shared: Arc<QueryContextShared>) -> Arc<QueryContext> {
shared.increment_ref_count();

tracing::debug!("Create DatabendQueryContext");
tracing::debug!("Create QueryContext");

Arc::new(QueryContext {
statistics: Arc::new(RwLock::new(Statistics::default())),
Expand Down Expand Up @@ -199,10 +200,6 @@ impl QueryContext {
self.shared.get_current_database()
}

pub fn get_current_user(&self) -> Result<UserInfo> {
self.shared.get_current_user()
}

pub async fn set_current_database(&self, new_database_name: String) -> Result<()> {
let catalog = self.get_catalog();
match catalog.get_database(&new_database_name).await {
Expand All @@ -218,6 +215,10 @@ impl QueryContext {
Ok(())
}

pub fn get_current_user(&self) -> Result<UserInfo> {
self.shared.get_current_user()
}

pub fn get_fuse_version(&self) -> String {
self.version.clone()
}
Expand All @@ -235,19 +236,54 @@ impl QueryContext {
format!("_subquery_{}", index)
}

pub fn get_sessions_manager(self: &Arc<Self>) -> Arc<SessionManager> {
self.shared.session.get_sessions_manager()
// Get user manager api.
pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.shared
.session
.get_sessions_manager()
.get_user_manager()
}

pub fn get_session(self: &Arc<Self>) -> Arc<Session> {
// Get the current session.
pub fn get_current_session(self: &Arc<Self>) -> Arc<Session> {
self.shared.session.clone()
}

pub fn get_shared_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
// Get one session by session id.
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
self.shared
.session
.get_sessions_manager()
.get_session_by_id(id)
}

// Get all the processes list info.
pub fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared.session.get_sessions_manager().processes_info()
}

/// Get the data accessor metrics.
pub fn get_dal_metrics(&self) -> DalMetrics {
self.shared.dal_ctx.get_metrics()
}

/// Get the session running query.
pub fn get_query_str(&self) -> String {
self.shared.get_query_str()
}

// Get the client socket address.
pub fn get_client_address(&self) -> Option<SocketAddr> {
self.shared.session.mutable_state.get_client_host()
}

pub fn get_data_accessor(&self) -> Result<Arc<dyn DataAccessor>> {
// Get table cache.
pub fn get_storage_cache(&self) -> Arc<Option<Box<dyn StorageCache>>> {
self.shared.session.sessions.get_storage_cache()
}

// Get the storage data accessor by config.
pub fn get_storage_accessor(&self) -> Result<Arc<dyn DataAccessor>> {
let storage_conf = &self.get_config().storage;
let scheme_name = &storage_conf.storage_type;
let scheme = StorageScheme::from_str(scheme_name)?;
Expand Down Expand Up @@ -279,26 +315,6 @@ impl QueryContext {
da,
)))
}

/// Get the data accessor metrics.
pub fn get_dal_metrics(&self) -> DalMetrics {
self.shared.dal_ctx.get_metrics()
}

/// Get the session running query.
pub fn get_query_str(&self) -> String {
self.shared.get_query_str()
}

// Get the client socket address.
pub fn get_client_address(&self) -> Option<SocketAddr> {
self.shared.session.mutable_state.get_client_host()
}

// Get table cache
pub fn get_table_cache(&self) -> Arc<Option<Box<dyn StorageCache>>> {
self.shared.get_table_cache()
}
}

impl TrySpawn for QueryContext {
Expand Down Expand Up @@ -329,7 +345,7 @@ impl QueryContextShared {
pub(in crate::sessions) fn destroy_context_ref(&self) {
if self.ref_count.fetch_sub(1, Ordering::Release) == 1 {
std::sync::atomic::fence(Acquire);
tracing::debug!("Destroy DatabendQueryContext");
tracing::debug!("Destroy QueryContext");
self.session.destroy_context_shared();
}
}
Expand Down
5 changes: 0 additions & 5 deletions query/src/sessions/context_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use std::sync::Arc;

use common_base::Progress;
use common_base::Runtime;
use common_cache::storage::StorageCache;
use common_dal::DalContext;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -202,10 +201,6 @@ impl QueryContextShared {
let mut sources_abort_handle = self.sources_abort_handle.write();
sources_abort_handle.push(handle);
}

pub fn get_table_cache(&self) -> Arc<Option<Box<dyn StorageCache>>> {
self.session.sessions.get_table_cache()
}
}

impl Session {
Expand Down
Loading