Skip to content

Commit

Permalink
fix(processor): try fix test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Oct 3, 2022
1 parent 6f76776 commit 853a8c5
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 58 deletions.
8 changes: 0 additions & 8 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::sync::Arc;

use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_base::base::Runtime;
use common_config::Config;
use common_contexts::DalContext;
use common_contexts::DalMetrics;
Expand Down Expand Up @@ -66,10 +65,6 @@ pub trait TableContext: Send + Sync {
fn get_write_progress_value(&self) -> ProgressValues;
fn get_result_progress(&self) -> Arc<Progress>;
fn get_result_progress_value(&self) -> ProgressValues;
// Steal n partitions from the partition pool by the pipeline worker.
// This also can steal the partitions from distributed node.
fn try_get_partitions(&self, num: u64) -> Result<Partitions>;

fn try_get_part(&self) -> Option<PartInfoPtr>;
// Update the context partition pool from the pipeline builder.
fn try_set_partitions(&self, partitions: Partitions) -> Result<()>;
Expand Down Expand Up @@ -107,7 +102,4 @@ pub trait TableContext: Send + Sync {
async fn get_table(&self, catalog: &str, database: &str, table: &str)
-> Result<Arc<dyn Table>>;
fn get_processes_info(&self) -> Vec<ProcessInfo>;
fn get_runtime(&self) -> Result<Arc<Runtime>>;

fn clone_inner(&self) -> Arc<dyn TableContext>;
}
3 changes: 2 additions & 1 deletion src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,8 @@ impl FragmentCoordinator {

match &self.payload {
FragmentPayload::PlanV2(plan) => {
let pipeline_builder = PipelineBuilderV2::create(ctx);
let new_ctx = QueryContext::create_from(ctx);
let pipeline_builder = PipelineBuilderV2::create(new_ctx);
self.pipeline_build_res = Some(pipeline_builder.finalize(plan)?);
}
};
Expand Down
34 changes: 2 additions & 32 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use chrono_tz::Tz;
use common_base::base::tokio::task::JoinHandle;
use common_base::base::Progress;
use common_base::base::ProgressValues;
use common_base::base::Runtime;
use common_base::base::TrySpawn;
use common_contexts::DalContext;
use common_contexts::DalMetrics;
Expand Down Expand Up @@ -69,7 +68,6 @@ pub struct QueryContext {
version: String,
partition_queue: Arc<RwLock<VecDeque<PartInfoPtr>>>,
shared: Arc<QueryContextShared>,
precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
fragment_id: Arc<AtomicUsize>,
}

Expand All @@ -85,7 +83,6 @@ impl QueryContext {
partition_queue: Arc::new(RwLock::new(VecDeque::new())),
version: format!("DatabendQuery {}", *crate::version::DATABEND_COMMIT_VERSION),
shared,
precommit_blocks: Arc::new(RwLock::new(Vec::new())),
fragment_id: Arc::new(AtomicUsize::new(0)),
})
}
Expand Down Expand Up @@ -226,20 +223,6 @@ impl TableContext for QueryContext {
fn get_result_progress_value(&self) -> ProgressValues {
self.shared.result_progress.as_ref().get_values()
}
// Steal n partitions from the partition pool by the pipeline worker.
// This also can steal the partitions from distributed node.
fn try_get_partitions(&self, num: u64) -> Result<Partitions> {
let mut partitions = vec![];
for _ in 0..num {
match self.partition_queue.write().pop_back() {
None => break,
Some(partition) => {
partitions.push(partition);
}
}
}
Ok(partitions)
}

fn try_get_part(&self) -> Option<PartInfoPtr> {
self.partition_queue.write().pop_front()
Expand Down Expand Up @@ -334,15 +317,10 @@ impl TableContext for QueryContext {
self.shared.dal_ctx.as_ref()
}
fn push_precommit_block(&self, block: DataBlock) {
let mut blocks = self.precommit_blocks.write();
blocks.push(block);
self.shared.push_precommit_block(block)
}
fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
let mut blocks = self.precommit_blocks.write();

let mut swaped_precommit_blocks = vec![];
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
swaped_precommit_blocks
self.shared.consume_precommit_blocks()
}
fn try_get_function_context(&self) -> Result<FunctionContext> {
let tz = self.get_settings().get_timezone()?;
Expand Down Expand Up @@ -382,14 +360,6 @@ impl TableContext for QueryContext {
fn get_processes_info(&self) -> Vec<ProcessInfo> {
SessionManager::instance().processes_info()
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
}

fn clone_inner(&self) -> Arc<dyn TableContext> {
QueryContext::create_from_shared(self.shared.clone())
}
}

impl TrySpawn for QueryContext {
Expand Down
16 changes: 16 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::sync::Weak;
use common_base::base::Progress;
use common_base::base::Runtime;
use common_contexts::DalContext;
use common_datablocks::DataBlock;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserInfo;
Expand Down Expand Up @@ -79,6 +80,7 @@ pub struct QueryContextShared {
pub(in crate::sessions) catalog_manager: Arc<CatalogManager>,
pub(in crate::sessions) storage_operator: StorageOperator,
pub(in crate::sessions) executor: Arc<RwLock<Weak<PipelineExecutor>>>,
pub(in crate::sessions) precommit_blocks: Arc<RwLock<Vec<DataBlock>>>,
}

impl QueryContextShared {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl QueryContextShared {
auth_manager: AuthMgr::create(config).await?,
affect: Arc::new(Mutex::new(None)),
executor: Arc::new(RwLock::new(Weak::new())),
precommit_blocks: Arc::new(RwLock::new(vec![])),
}))
}

Expand Down Expand Up @@ -297,4 +300,17 @@ impl QueryContextShared {
let mut executor = self.executor.write();
*executor = weak_ptr;
}

pub fn push_precommit_block(&self, block: DataBlock) {
let mut blocks = self.precommit_blocks.write();
blocks.push(block);
}

pub fn consume_precommit_blocks(&self) -> Vec<DataBlock> {
let mut blocks = self.precommit_blocks.write();

let mut swaped_precommit_blocks = vec![];
std::mem::swap(&mut *blocks, &mut swaped_precommit_blocks);
swaped_precommit_blocks
}
}
4 changes: 1 addition & 3 deletions src/query/storages/fuse/src/operations/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl FuseTable {
#[inline]
pub fn do_read2(
&self,
mut ctx: Arc<dyn TableContext>,
ctx: Arc<dyn TableContext>,
plan: &ReadDataSourcePlan,
pipeline: &mut Pipeline,
) -> Result<()> {
Expand All @@ -87,8 +87,6 @@ impl FuseTable {
}

if !lazy_init_segments.is_empty() {
ctx = ctx.clone_inner();

let table_info = self.table_info.clone();
let push_downs = plan.push_downs.clone();
let query_ctx = ctx.clone();
Expand Down
14 changes: 0 additions & 14 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,6 @@ impl FuseTable {
}
}

pub fn sync_prune_snapshot_blocks(
ctx: Arc<dyn TableContext>,
push_downs: Option<Extras>,
table_info: TableInfo,
segments_location: Vec<Location>,
) -> Result<(Statistics, Partitions)> {
let block_metas =
BlockPruner::sync_prune(&ctx, table_info.schema(), &push_downs, segments_location)?
.into_iter()
.map(|(_, v)| v)
.collect::<Vec<_>>();
Self::read_partitions_with_metas(ctx, table_info.schema(), push_downs, block_metas, 0)
}

pub async fn prune_snapshot_blocks(
ctx: Arc<dyn TableContext>,
push_downs: Option<Extras>,
Expand Down

0 comments on commit 853a8c5

Please sign in to comment.