-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Limits are not applied correctly #14418
Conversation
error: use of deprecated constant `arrow::datatypes::MAX_DECIMAL_FOR_EACH_PRECISION`: Use MAX_DECIMAL128_FOR_EACH_PRECISION (note indexes are different)
--> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:29:25
|
29 | DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `-D deprecated` implied by `-D warnings`
= help: to override `-D warnings` add `#[allow(deprecated)]`
error: use of deprecated constant `arrow::datatypes::MIN_DECIMAL_FOR_EACH_PRECISION`: Use MIN_DECIMAL_FOR_EACH_PRECISION (note indexes are different)
--> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:29:57
|
29 | DataType, TimeUnit, MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error: use of deprecated constant `arrow::datatypes::MIN_DECIMAL_FOR_EACH_PRECISION`: Use MIN_DECIMAL_FOR_EACH_PRECISION (note indexes are different)
--> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:372:13
|
372 | MIN_DECIMAL_FOR_EACH_PRECISION[*precision as usize - 1],
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
error: use of deprecated constant `arrow::datatypes::MAX_DECIMAL_FOR_EACH_PRECISION`: Use MAX_DECIMAL128_FOR_EACH_PRECISION (note indexes are different)
--> datafusion/optimizer/src/unwrap_cast_in_comparison.rs:373:13
|
373 | MAX_DECIMAL_FOR_EACH_PRECISION[*precision as usize - 1],
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ The clippy complain not related to this PR. I will be fixed after: |
Tests look good to me. I have not touched much of the optimizer, so I'm not confident reviewing the change itself. Your explanation is great, I think it is important to record for posterity what the source cause was. |
Thanks for the patch. We may be able to do this without checking for specific operators (like |
Thank you @adriangb for review. This only happen when sort with limit, because: The following logic: // If we have a non-limit operator with fetch capability, update global
// state as necessary:
if pushdown_plan.fetch().is_some() {
if global_state.fetch.is_none() {
global_state.satisfied = true;
}
(global_state.skip, global_state.fetch) = combine_limit(
global_state.skip,
global_state.fetch,
0,
pushdown_plan.fetch(),
);
} When sort with limit, the following steps cause the bug:
if pushdown_plan.supports_limit_pushdown() {
if !combines_input_partitions(&pushdown_plan) {
// We have information in the global state and the plan pushes down,
// continue:
Ok((Transformed::no(pushdown_plan), global_state))
} else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
// This plan is combining input partitions, so we need to add the
// fetch info to plan if possible. If not, we must add a `LimitExec`
// with the information from the global state.
let mut new_plan = plan_with_fetch;
// Execution plans can't (yet) handle skip, so if we have one,
// we still need to add a global limit
if global_state.skip > 0 {
new_plan =
add_global_limit(new_plan, global_state.skip, global_state.fetch);
}
global_state.fetch = skip_and_fetch;
global_state.skip = 0;
global_state.satisfied = true;
Ok((Transformed::yes(new_plan), global_state))
} else if global_state.satisfied {
// If the plan is already satisfied, do not add a limit:
Ok((Transformed::no(pushdown_plan), global_state))
} else {
global_state.satisfied = true;
Ok((
Transformed::yes(add_limit(
pushdown_plan,
global_state.skip,
global_fetch,
)),
global_state,
))
}
} |
Thank you @ozankabak for review, yeah i believe add with_fetch API for CoalescePartitionsExec may also solve this issue, and i also add it to do in the PR comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM, I agree with @ozankabak 's suggestion.
Maybe you can also file an issue.
if limit_exec.input().as_any().is::<CoalescePartitionsExec>() { | ||
// If the child is a `CoalescePartitionsExec`, we should not remove the limit | ||
// the push_down through the `CoalescePartitionsExec` to each partition will not guarantee the limit. | ||
// todo we may have a better solution if we can support with_fetch for limit inside CoalescePartitionsExec. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @xudong963 for review, change the comments, and added a follow-up issue:
#14446
Thank you @xudong963 for review, added the follow-up: |
@@ -146,6 +146,15 @@ pub fn pushdown_limit_helper( | |||
global_state.skip = skip; | |||
global_state.fetch = fetch; | |||
|
|||
if limit_exec.input().as_any().is::<CoalescePartitionsExec>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree with checking via API suggestion, please also check with the combines_input_partitions()
helper function so that SortPreservingMerge can be affected as well.
In the optimizer logic, we remove the Limit operators first, and then we add them to the lowest possible point at the plan, if the plan is "satisfied" we drop the limit information. So if the plan is combining input partitions, we're only adding a global limit if skip
information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there. But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec
or in SortPreservingMerge
so that it won't unnecessarily push more data
// Execution plans can't (yet) handle skip, so if we have one,
// we still need to add a global limit
if global_state.skip > 0 {
new_plan =
add_global_limit(new_plan, global_state.skip, global_state.fetch);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @mertak-synnada for review:
While I agree with checking via API suggestion, please also check with the combines_input_partitions() helper function so that SortPreservingMerge can be affected as well.
I agree, i checked the SortPreservingMergeExec already, it supported with_fetch() and fetch(), so it's not affected i think?
impl SortPreservingMergeExec {
/// Create a new sort execution plan
pub fn new(expr: LexOrdering, input: Arc<dyn ExecutionPlan>) -> Self {
let cache = Self::compute_properties(&input, expr.clone());
Self {
input,
expr,
metrics: ExecutionPlanMetricsSet::new(),
fetch: None,
cache,
enable_round_robin_repartition: true,
}
}
/// Sets the number of rows to fetch
pub fn with_fetch(mut self, fetch: Option<usize>) -> Self {
self.fetch = fetch;
self
}
/// Sets the selection strategy of tied winners of the loser tree algorithm
///
/// If true (the default) equal output rows are placed in the merged stream
/// in round robin fashion. This approach consumes input streams at more
/// even rates when there are many rows with the same sort key.
///
/// If false, equal output rows are always placed in the merged stream in
/// the order of the inputs, resulting in potentially slower execution but a
/// stable output order.
pub fn with_round_robin_repartition(
mut self,
enable_round_robin_repartition: bool,
) -> Self {
self.enable_round_robin_repartition = enable_round_robin_repartition;
self
}
/// Input schema
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
&self.input
}
/// Sort expressions
pub fn expr(&self) -> &LexOrdering {
self.expr.as_ref()
}
/// Fetch
pub fn fetch(&self) -> Option<usize> {
self.fetch
}
/// Creates the cache object that stores the plan properties
/// such as schema, equivalence properties, ordering, partitioning, etc.
fn compute_properties(
input: &Arc<dyn ExecutionPlan>,
ordering: LexOrdering,
) -> PlanProperties {
let mut eq_properties = input.equivalence_properties().clone();
eq_properties.clear_per_partition_constants();
eq_properties.add_new_orderings(vec![ordering]);
PlanProperties::new(
eq_properties, // Equivalence Properties
Partitioning::UnknownPartitioning(1), // Output Partitioning
input.pipeline_behavior(), // Pipeline Behavior
input.boundedness(), // Boundedness
)
}
}
But in the end, I think rather than adding a global limit, we should be able to limit in the CoalescePartitionsExec or in SortPreservingMerge so that it won't unnecessarily push more data.
I totally agree this! So i created a follow-up #14446 to support limit in the CoalescePartitionsExec, SortPreservingMerge already supported this according above code.
So if the plan is combining input partitions, we're only adding a global limit if skip information is there, maybe we can identify if the local limits are enough or not and then decide to add the global limit at there.
This is a good point, we can create another issue to try to improve this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated, i confirmed SortPreservingMerge works well with fetch:
# Check output plan, expect no "output_ordering" clause in the physical_plan -> ParquetExec:
query TT
explain with selection as (
select *
from test_table
ORDER BY string_col, int_col limit 1
)
select 1 as foo
from selection
order by string_col
limit 1000;
----
logical_plan
01)Projection: foo
02)--Sort: selection.string_col ASC NULLS LAST, fetch=1000
03)----Projection: Int64(1) AS foo, selection.string_col
04)------SubqueryAlias: selection
05)--------Projection: test_table.string_col
06)----------Sort: test_table.string_col ASC NULLS LAST, test_table.int_col ASC NULLS LAST, fetch=1
07)------------TableScan: test_table projection=[int_col, string_col]
physical_plan
01)ProjectionExec: expr=[foo@0 as foo]
02)--ProjectionExec: expr=[1 as foo, string_col@0 as string_col]
03)----ProjectionExec: expr=[string_col@1 as string_col]
04)------SortPreservingMergeExec: [string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], fetch=1
05)--------SortExec: TopK(fetch=1), expr=[string_col@1 ASC NULLS LAST, int_col@0 ASC NULLS LAST], preserve_partitioning=[true]
06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_table/1.parquet]]}, projection=[int_col, string_col]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
Thank you @zhuqi-lucas, @mertak-synnada and @xudong963. This looks good to me now, and I'm merging it. I guess @mertak-synnada will open a follow-up PR removing the explicit casting by utilizing a state parameter. |
Thank you all for the quick fix! |
Which issue does this PR close?
Closes 14406
Rationale for this change
Fix the behaviour for limit with CoalescePartitionsExec.
CoalescePartitionsExec will merge partitions into one, but each partition has it's locallimit, we should not remove the global limit before CoalescePartitionsExec.
What changes are included in this PR?
Fix the behaviour for limit with CoalescePartitionsExec.
Are these changes tested?
Yes, slt testing added.
Are there any user-facing changes?
It will fix the user facing issue.
Before this PR: