Skip to content

Commit

Permalink
Merge pull request #2660 from sundy-li/column-prune
Browse files Browse the repository at this point in the history
Make push_downs work in table's read
  • Loading branch information
databend-bot authored Nov 5, 2021
2 parents 8a6d88a + 794ed16 commit 0224fd2
Show file tree
Hide file tree
Showing 60 changed files with 435 additions and 405 deletions.
26 changes: 19 additions & 7 deletions common/planners/src/plan_display_indent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,26 @@ impl<'a> PlanNodeIndentFormatDisplay<'a> {
plan.statistics.read_bytes,
)?;

if plan.push_downs.is_some() {
let extras = plan.push_downs.clone().unwrap();
write!(f, ", push_downs: [")?;
if extras.limit.is_some() {
write!(f, "limit: {}", extras.limit.unwrap())?;
write!(f, ", order_by: {:?}", extras.order_by)?;
if let Some(p) = &plan.push_downs {
if p.limit.is_some() || p.projection.is_some() {
write!(f, ", push_downs: [")?;
let mut comma = false;
if p.projection.is_some() {
write!(f, "projections: {:?}", p.projection.clone().unwrap())?;
comma = true;
}

if p.limit.is_some() {
if comma {
write!(f, ", ")?;
}

write!(f, "limit: {:?}", p.limit.unwrap())?;
write!(f, ", order_by: {:?}", p.order_by)?;
}

write!(f, "]")?;
}
write!(f, "]")?;
}
Ok(())
}
Expand Down
8 changes: 2 additions & 6 deletions query/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,10 @@ async fn select_table(sessions: SessionManagerRef) -> Result<String> {

async fn execute_query(context: DatabendQueryContextRef) -> Result<SendableDataBlockStream> {
let tracing_table = context.get_table("system", "tracing")?;
let io_ctx = context.get_single_node_table_io_context()?;
let io_ctx = context.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);

let tracing_table_read_plan = tracing_table.read_plan(
io_ctx.clone(),
None,
Some(context.get_settings().get_max_threads()? as usize),
)?;
let tracing_table_read_plan = tracing_table.read_plan(io_ctx.clone(), None)?;

tracing_table.read(io_ctx, &tracing_table_read_plan).await
}
62 changes: 41 additions & 21 deletions query/src/catalogs/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::collections::BTreeMap;
use std::sync::Arc;

use common_context::TableIOContext;
Expand Down Expand Up @@ -57,12 +58,16 @@ pub trait Table: Sync + Send {

fn get_table_info(&self) -> &TableInfo;

/// whether column prune(projection) can help in table read
fn benefit_column_prune(&self) -> bool {
false
}

// defaults to generate one single part and empty statistics
fn read_partitions(
&self,
_io_ctx: Arc<TableIOContext>,
_push_downs: Option<Extras>,
_partition_num_hint: Option<usize>,
) -> Result<(Statistics, Partitions)> {
Ok((Statistics::default(), vec![Part {
name: "".to_string(),
Expand Down Expand Up @@ -108,11 +113,11 @@ pub trait Table: Sync + Send {
pub type TablePtr = Arc<dyn Table>;

pub trait ToReadDataSourcePlan {
/// Real read_plan to access partitions/push_downs
fn read_plan(
&self,
io_ctx: Arc<TableIOContext>,
push_downs: Option<Extras>,
partition_num_hint: Option<usize>,
) -> Result<ReadDataSourcePlan>;
}

Expand All @@ -121,32 +126,29 @@ impl ToReadDataSourcePlan for dyn Table {
&self,
io_ctx: Arc<TableIOContext>,
push_downs: Option<Extras>,
partition_num_hint: Option<usize>,
) -> Result<ReadDataSourcePlan> {
let (statistics, parts) =
self.read_partitions(io_ctx, push_downs.clone(), partition_num_hint)?;
let (statistics, parts) = self.read_partitions(io_ctx, push_downs.clone())?;
let table_info = self.get_table_info();

let description = if statistics.read_rows > 0 {
format!(
"(Read from {} table, {} Read Rows:{}, Read Bytes:{})",
table_info.desc,
if statistics.is_exact {
"Exactly"
} else {
"Approximately"
},
statistics.read_rows,
statistics.read_bytes,
)
} else {
format!("(Read from {} table)", table_info.desc)
let description = get_description(table_info, &statistics);

let scan_fields = match (self.benefit_column_prune(), &push_downs) {
(true, Some(push_downs)) => match &push_downs.projection {
Some(projection) if projection.len() < table_info.schema().fields().len() => {
let fields = projection
.iter()
.map(|i| table_info.schema().field(*i).clone());

Some((projection.iter().cloned().zip(fields)).collect::<BTreeMap<_, _>>())
}
_ => None,
},
_ => None,
};

Ok(ReadDataSourcePlan {
table_info: table_info.clone(),

scan_fields: None,
scan_fields,
parts,
statistics,
description,
Expand All @@ -155,3 +157,21 @@ impl ToReadDataSourcePlan for dyn Table {
})
}
}

fn get_description(table_info: &TableInfo, statistics: &Statistics) -> String {
if statistics.read_rows > 0 {
format!(
"(Read from {} table, {} Read Rows:{}, Read Bytes:{})",
table_info.desc,
if statistics.is_exact {
"Exactly"
} else {
"Approximately"
},
statistics.read_rows,
statistics.read_bytes,
)
} else {
format!("(Read from {} table)", table_info.desc)
}
}
6 changes: 1 addition & 5 deletions query/src/datasources/database/system/clusters_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ async fn test_clusters_table() -> Result<()> {

let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/columns_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ async fn test_columns_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;

let table: Arc<dyn Table> = Arc::new(ColumnsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/configs_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,9 @@ async fn test_configs_table() -> Result<()> {
ctx.get_settings().set_max_threads(8)?;

let table: Arc<dyn Table> = Arc::new(ConfigsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ use crate::datasources::database::system::ContributorsTable;
async fn test_contributors_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(ContributorsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/credits_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ async fn test_credits_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;

let table: Arc<dyn Table> = Arc::new(CreditsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/databases_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ use crate::datasources::database::system::DatabasesTable;
async fn test_tables_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(DatabasesTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/functions_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ use crate::datasources::database::system::FunctionsTable;
async fn test_functions_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(FunctionsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/metrics_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ async fn test_metrics_table() -> Result<()> {
init_default_metrics_recorder();
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(MetricsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;
metrics::counter!("test.test_metrics_table_count", 1);
metrics::histogram!("test.test_metrics_table_histogram", 1.0);

Expand Down
1 change: 0 additions & 1 deletion query/src/datasources/database/system/one_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl Table for OneTable {
&self,
_io_ctx: Arc<TableIOContext>,
_push_downs: Option<Extras>,
_partition_num_hint: Option<usize>,
) -> Result<(Statistics, Partitions)> {
Ok((Statistics::new_exact(1, 1), vec![Part {
name: "".to_string(),
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/settings_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ async fn test_settings_table() -> Result<()> {
ctx.get_settings().set_max_threads(2)?;

let table: Arc<dyn Table> = Arc::new(SettingsTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/tables_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ use crate::datasources::database::system::TablesTable;
async fn test_tables_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(TablesTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
8 changes: 2 additions & 6 deletions query/src/datasources/database/system/tracing_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,9 @@ use crate::datasources::database::system::TracingTable;
async fn test_tracing_table() -> Result<()> {
let ctx = crate::tests::try_create_context()?;
let table: Arc<dyn Table> = Arc::new(TracingTable::create(1));
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(
io_ctx.clone(),
None,
Some(ctx.get_settings().get_max_threads()? as usize),
)?;
let source_plan = table.read_plan(io_ctx.clone(), None)?;

let stream = table.read(io_ctx, &source_plan).await?;
let result = stream.try_collect::<Vec<_>>().await?;
Expand Down
1 change: 0 additions & 1 deletion query/src/datasources/table/csv/csv_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ impl Table for CsvTable {
&self,
io_ctx: Arc<TableIOContext>,
_push_downs: Option<Extras>,
_partition_num_hint: Option<usize>,
) -> Result<(Statistics, Partitions)> {
let start_line: usize = if self.has_header { 1 } else { 0 };
let file = &self.file;
Expand Down
10 changes: 4 additions & 6 deletions query/src/datasources/table/csv/csv_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,9 @@ async fn test_csv_table() -> Result<()> {
Arc::new(TableDataContext::default()),
)?;

let partitions = ctx.get_settings().get_max_threads()? as usize;
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(io_ctx.clone(), Some(Extras::default()), Some(partitions))?;
let source_plan = table.read_plan(io_ctx.clone(), Some(Extras::default()))?;
ctx.try_set_partitions(source_plan.parts.clone())?;

let stream = table.read(io_ctx, &source_plan).await?;
Expand Down Expand Up @@ -123,10 +122,9 @@ async fn test_csv_table_parse_error() -> Result<()> {
Arc::new(TableDataContext::default()),
)?;

let partitions = ctx.get_settings().get_max_threads()? as usize;
let io_ctx = ctx.get_single_node_table_io_context()?;
let io_ctx = ctx.get_cluster_table_io_context()?;
let io_ctx = Arc::new(io_ctx);
let source_plan = table.read_plan(io_ctx.clone(), Some(Extras::default()), Some(partitions))?;
let source_plan = table.read_plan(io_ctx.clone(), Some(Extras::default()))?;
ctx.try_set_partitions(source_plan.parts.clone())?;

let stream = table.read(io_ctx, &source_plan).await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/datasources/table/fuse/index/min_max_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn test_min_max_index() -> Result<()> {
schema: test_schema.clone(),
input_stream: Arc::new(Mutex::new(Some(Box::pin(futures::stream::iter(blocks))))),
};
let io_ctx = Arc::new(ctx.get_single_node_table_io_context()?);
let io_ctx = Arc::new(ctx.get_cluster_table_io_context()?);
let da = io_ctx.get_data_accessor()?;
table.append_data(io_ctx.clone(), insert_into_plan).await?;

Expand Down
Loading

0 comments on commit 0224fd2

Please sign in to comment.