Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUES-468 support query stage level shuffle data #469

Merged
merged 90 commits into from
May 31, 2021

Conversation

zhang2014
Copy link
Member

@zhang2014 zhang2014 commented May 4, 2021

Summary

Support query stage level shuffle data [Initial stage]

TODO

  • RFC document
  • Experimental implementation

Changelog

  • New Feature

Related Issues

fixes #468
fixes #614
related #440

@github-actions github-actions bot added the A-query Area: databend query label May 4, 2021
@zhang2014 zhang2014 force-pushed the support/shuffle_stage branch from cda5446 to 29d28c5 Compare May 4, 2021 09:26
@zhang2014 zhang2014 force-pushed the support/shuffle_stage branch from 8f78940 to d0ecc83 Compare May 6, 2021 13:59
use common_arrow::arrow::alloc::NativeType;

#[test]
fn test_scatter_primitive_data() -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

let mut block_columns = vec![];
for scattered_column in &scattered_columns[begin_index..end_index] {
match scattered_column {
None => panic!(""),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return Err ?

@codecov-commenter
Copy link

codecov-commenter commented May 15, 2021

Codecov Report

Merging #469 (a4eff0f) into master (198b6c2) will decrease coverage by 1%.
The diff coverage is 75%.

Impacted file tree graph

@@           Coverage Diff            @@
##           master    #469     +/-   ##
========================================
- Coverage      80%     78%     -2%     
========================================
  Files         289     302     +13     
  Lines       14255   16297   +2042     
========================================
+ Hits        11406   12806   +1400     
- Misses       2849    3491    +642     
Impacted Files Coverage Δ
common/datavalues/src/data_array_hash.rs 0% <0%> (ø)
common/functions/src/function_factory.rs 93% <ø> (ø)
common/functions/src/hashes/siphash.rs 0% <0%> (ø)
common/planners/src/plan_builder.rs 84% <ø> (+<1%) ⬆️
common/planners/src/plan_explain.rs 0% <0%> (ø)
common/planners/src/plan_insert_into.rs 0% <0%> (ø)
common/planners/src/plan_rewriter.rs 46% <0%> (-5%) ⬇️
common/planners/src/plan_rewriter_test.rs 84% <ø> (-1%) ⬇️
common/planners/src/plan_use_database.rs 0% <0%> (ø)
fusequery/query/src/api/rpc/flight_client_new.rs 0% <0%> (ø)
... and 130 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 198b6c2...a4eff0f. Read the comment docs.

@zhang2014 zhang2014 requested review from BohuTANG and sundy-li May 30, 2021 12:31
}
}

let mut scattered_blocks = vec![];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to use vec::with_capacity?

stdout_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stdout'
stderr_file = os.path.join(suite_tmp_dir, name) + file_suffix + '.stderr'

if args.mode == 'cluster' and os.path.isfile(cluster_result_file):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

curl http://127.0.0.1:8081/v1/cluster/add -X POST -H "Content-Type: application/json" -d '{"name":"cluster1","address":"0.0.0.0:9091", "priority":3, "cpus":8}'
curl http://127.0.0.1:8081/v1/cluster/add -X POST -H "Content-Type: application/json" -d '{"name":"cluster2","address":"0.0.0.0:9092", "priority":3, "cpus":8}'
curl http://127.0.0.1:8081/v1/cluster/add -X POST -H "Content-Type: application/json" -d '{"name":"cluster3","address":"0.0.0.0:9093", "priority":1, "cpus":8}'
curl http://127.0.0.1:8081/v1/cluster/add -X POST -H "Content-Type: application/json" -d '{"name":"cluster1","address":"127.0.0.1:9091", "priority":3, "cpus":8}'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, the cpus is not used anymore.

scattered_columns.resize_with(scatter_size * columns_size, || None);

for column_index in 0..columns_size {
let column = block.column(column_index).to_array()?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For constant column, we can have a faster path

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's optimize it in other patches #658

PlanNode::Explain(v) => v.set_input(inputs[0]),
PlanNode::Select(v) => v.set_input(inputs[0]),
PlanNode::Sort(v) => v.set_input(inputs[0]),
_ => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to return an error for the other types of PlanNode; otherwise, the error can be propagated to somewhere else.

A query plan (or query execution plan) is a sequence of steps used to access data in DataFuse. It is built by PlanBuilder from AST. We also use tree to describe it(similar to AST). But it has some differences with AST:

- Plan is serializable and deserializable.
- Plan is grammatically safe, we don't worry about it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because the plan has passed the parser?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep

- In distributed mode, the tables to be queried are always distributed in different nodes
- For some scenarios, distributed processing is always efficient, such as GROUP BY with keys, JOIN
- For some scenarios, we have no way of distributed processing, such as LIMIT, GROUP BY without keys
- In order to ensure fast calculation, we need to coordinate the position of calculation and data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does position mean location?

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| explain |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Projection: argMin(user, salary):UInt64 <-- execute in local node
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which sql statement is this explantion for?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EXPLAIN SELECT argMin(user, salary)  FROM (SELECT sum(number) AS salary, number%3 AS user FROM numbers_local(1000000000) GROUP BY user);

numbers_local is a local table, numbers_local (1000000000) it's very large (more than 500MB or 100 million rows), ScatterOptimizer will think it's more appropriate to distribute it.

GroupByMerge,
AggregatorMerge
Expansive,
Convergent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about the plan that preserves the cardinality?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is preserves the cardinality?

Ok(())
}

// Execute do_action.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do_get?


match plan.group_expr.len() {
0 => {
// For the final state, we need to aggregate the data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comments made some confusion, does it mean:
If no group by we convergent it in local node?

kind: StageKind::Normal,
scatters_expr: Expression::ScalarFunction {
op: String::from("sipHash"),
args: vec![Expression::Column(String::from("_group_by_key"))]
Copy link
Member

@BohuTANG BohuTANG May 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, the reshuffle key is here 💯

use crate::sessions::SessionManagerRef;

#[derive(Debug)]
pub struct PrepareStageInfo(
Copy link
Member

@BohuTANG BohuTANG May 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer to use a nomal struct for better code reading instead the tuple struct.
In particular, the prepare_stage method have some inconvenient to read

num: usize
) -> Result<FlightScatter> {
let indices_expression_action = Expression::ScalarFunction {
op: String::from("modulo"),
Copy link
Member

@BohuTANG BohuTANG May 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it only scatted by hash?
If there are some others scatter in future, it would be better to rename FlightScatter to FlightScatterByHash?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

scan.and_then(|scan| match scan {
PlanNode::Scan(ref scan) => table
.read_plan(self.ctx.clone(), scan)
.read_plan(self.ctx.clone(), scan, self.ctx.get_max_threads()? as usize)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already got ctx, why need to pass max_thread in ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In standalone mode, the maximum concurrency is max_threads.
For cluster mode, the maximum concurrent number should be max_threads * nodes_size.
We will re plan the plan in the plan_scheduler.

let table = ctx.get_table(&plan.db, &plan.table)?;
if !table.is_local() {
    let new_partitions_size = ctx.get_max_threads()? as usize * cluster_nodes.len();
    let new_read_source_plan = table.read_plan(ctx.clone(), &*plan.scan_plan, new_partitions_size)?; 
    // We always put adjacent partitions in the same node
    let new_partitions = &new_read_source_plan.partitions;
    let mut nodes_partitions = HashMap::new();
    let partitions_pre_node = new_partitions.len() / cluster_nodes.len();

reference: https://github.com/datafuselabs/datafuse/pull/469/files#diff-d9bf8b037f42cfc24f1b4e8138e966f80aae1b68deb6fe82ceb0d2ad7a93a806R236

@databend-bot
Copy link
Member

CI Passed
Reviewer Approved
Let's Merge

@databend-bot databend-bot merged commit 1d5d6e9 into databendlabs:master May 31, 2021
@BohuTANG BohuTANG mentioned this pull request Jun 7, 2021
2 tasks
@BohuTANG BohuTANG added this to the v0.5 milestone Jun 22, 2021
Maricaya pushed a commit to Maricaya/databend that referenced this pull request Sep 29, 2024
databendlabs#469)

* add RouteHintGenerator

* add in_active_transaction

* reset route hint

* chore: rename to route hint

* handle the route hint from query response
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-query Area: databend query pr-feature this PR introduces a new feature to the codebase
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Flight error broke tests Support shuffle data for query stage
6 participants