Skip to content

Commit

Permalink
[BugFix] Pushdown distinct agg across window not support complex expr (
Browse files Browse the repository at this point in the history
…#36357)

Signed-off-by: satanson <[email protected]>
(cherry picked from commit 7a0c140)

# Conflicts:
#	fe/fe-core/src/test/java/com/starrocks/sql/plan/ReplayFromDumpTest.java
  • Loading branch information
satanson authored and mergify[bot] committed Dec 5, 2023
1 parent d1f2887 commit 8f2ea2a
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalTopNOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalValuesOperator;
import com.starrocks.sql.optimizer.operator.logical.LogicalWindowOperator;
import com.starrocks.sql.optimizer.operator.physical.PhysicalAssertOneRowOperator;
import com.starrocks.sql.optimizer.operator.physical.PhysicalCTEAnchorOperator;
import com.starrocks.sql.optimizer.operator.physical.PhysicalCTEConsumeOperator;
Expand Down Expand Up @@ -251,6 +252,26 @@ public OperatorStr visitLogicalJoin(OptExpression optExpression, Integer step) {
return new OperatorStr(sb.toString(), step, Arrays.asList(leftChild, rightChild));
}

@Override
public OperatorStr visitLogicalWindow(OptExpression optExpression, Integer step) {
OperatorStr child = visit(optExpression.getInputs().get(0), step + 1);

LogicalWindowOperator window = optExpression.getOp().cast();
String windowCallStr = window.getWindowCall().entrySet().stream()
.map(e -> String.format("%d: %s", e.getKey().getId(),
scalarOperatorStringFunction.apply(e.getValue())))
.collect(Collectors.joining(", "));
String windowDefStr = window.getAnalyticWindow() != null ? window.getAnalyticWindow().toSql() : "NONE";
String partitionByStr = window.getPartitionExpressions().stream()
.map(scalarOperatorStringFunction).collect(Collectors.joining(", "));
String orderByStr = window.getOrderByElements().stream().map(Ordering::toString)
.collect(Collectors.joining(", "));
return new OperatorStr("logical window( calls=[" +
windowCallStr + "], window=" +
windowDefStr + ", partitionBy=" +
partitionByStr + ", orderBy=" + orderByStr + ")", step, Collections.singletonList(child));
}

@Override
public OperatorStr visitLogicalApply(OptExpression optExpression, Integer step) {
OperatorStr leftChild = visit(optExpression.getInputs().get(0), step + 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ public AggregatePushDownContext visitLogicalProject(OptExpression optExpression,
// rewrite
ReplaceColumnRefRewriter rewriter = new ReplaceColumnRefRewriter(projectOp.getColumnRefMap());
context.aggregations.replaceAll((k, v) -> (CallOperator) rewriter.rewrite(v));
context.groupBys.replaceAll((k, v) -> rewriter.rewrite(v));
Set<ColumnRefOperator> groupByUsedCols = context.groupBys.values().stream()
.flatMap(v -> rewriter.rewrite(v).getColumnRefs().stream()).collect(Collectors.toSet());
context.groupBys.clear();
groupByUsedCols.forEach(col -> context.groupBys.put(col, col));

if (projectOp.getColumnRefMap().values().stream().allMatch(ScalarOperator::isColumnRef)) {
return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public void testPushDownDistinctAggBelowWindow()
"where month(order_date)=1\n" +
"order by region, order_date";
String plan = UtFrameUtils.getVerboseFragmentPlan(connectContext, q1);
Assert.assertTrue(plan.contains(" 1:AGGREGATE (update finalize)\n" +
Assert.assertTrue(plan, plan.contains(" 1:AGGREGATE (update finalize)\n" +
" | aggregate: sum[([3: income, DECIMAL128(10,2), false]); args: DECIMAL128; " +
"result: DECIMAL128(38,2); args nullable: false; result nullable: true]\n" +
" | group by: [2: order_date, DATE, false], [1: region, VARCHAR, true]\n"));
" | group by: [1: region, VARCHAR, true], [2: order_date, DATE, false]\n"));

Assert.assertTrue(plan.contains(" 0:OlapScanNode\n" +
" table: trans, rollup: trans\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,10 @@ public void testPushDownDistinctAggBelowWindowRewrite() throws Exception {
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/pushdown_distinct_agg_below_window"), null,
TExplainLevel.COSTS);
Assert.assertTrue(replayPair.second.contains(" 1:AGGREGATE (update finalize)\n" +
Assert.assertTrue(replayPair.second, replayPair.second.contains(" 1:AGGREGATE (update finalize)\n" +
" | aggregate: sum[([3: gross, DECIMAL128(10,2), false]); args: DECIMAL128; " +
"result: DECIMAL128(38,2); args nullable: false; result nullable: true]\n" +
" | group by: [2: trans_date, DATE, false], [1: country, VARCHAR, true]\n" +
" | group by: [1: country, VARCHAR, true], [2: trans_date, DATE, false]\n" +
" | cardinality: 49070\n"));
}

Expand Down Expand Up @@ -785,4 +785,40 @@ public void testJoinWithArray() throws Exception {
" |----3:OlapScanNode\n" +
" | TABLE: tbl_mock_024"));
}
<<<<<<< HEAD
=======


@Test
public void testTwoStageAgg() throws Exception {
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/two_stage_agg"),
null, TExplainLevel.COSTS);
Assert.assertTrue(replayPair.second, replayPair.second.contains("1:AGGREGATE (update serialize)\n" +
" | STREAMING"));

Assert.assertTrue(replayPair.second, replayPair.second.contains("0:OlapScanNode\n" +
" table: lineorder_2, rollup: lineorder_2"));
}

@Test
public void testPushDistinctAggDownWindow() throws Exception {
Pair<QueryDumpInfo, String> replayPair =
getPlanFragment(getDumpInfoFromFile("query_dump/pushdown_distinct_agg_below_window2"),
null, TExplainLevel.NORMAL);
System.out.println(replayPair.second);
Assert.assertTrue(replayPair.second, replayPair.second.contains(" 3:ANALYTIC\n" +
" | functions: [, sum(5: sum), ]\n" +
" | partition by: 1: TIME\n" +
" | \n" +
" 2:SORT\n" +
" | order by: <slot 1> 1: TIME ASC\n" +
" | offset: 0\n" +
" | \n" +
" 1:AGGREGATE (update finalize)\n" +
" | output: sum(2: NUM)\n" +
" | group by: 1: TIME"));
}

>>>>>>> 7a0c140fe0 ([BugFix] Pushdown distinct agg across window not support complex expr (#36357))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"statement":"with temp as ( \nselect TIME, ceil(round(sum(`NUM`) OVER (PARTITION BY `TIME` ))) from test_data\n) select distinct * from temp\n","table_meta":{"ays_4.test_data":"CREATE TABLE `test_data` (\n `TIME` varchar(65533) NULL COMMENT \"\",\n `NUM` bigint(20) NULL COMMENT \"\"\n) ENGINE=OLAP \nDUPLICATE KEY(`TIME`)\nDISTRIBUTED BY HASH(`TIME`) BUCKETS 10 \nPROPERTIES (\n\"replication_num\" = \"1\",\n\"in_memory\" = \"false\",\n\"enable_persistent_index\" = \"false\",\n\"replicated_storage\" = \"true\",\n\"compression\" = \"LZ4\"\n);"},"table_row_count":{},"session_variables":"{\"partial_update_mode\":\"auto\",\"cbo_cte_reuse\":true,\"character_set_connection\":\"utf8\",\"cbo_use_correlated_join_estimate\":true,\"enable_insert_strict\":true,\"enable_connector_adaptive_io_tasks\":true,\"tx_isolation\":\"REPEATABLE-READ\",\"enable_hive_metadata_cache_with_insert\":false,\"cbo_cte_reuse_rate_v2\":1.15,\"character_set_results\":\"utf8\",\"enable_count_star_optimization\":true,\"query_excluding_mv_names\":\"\",\"enable_rewrite_simple_agg_to_meta_scan\":false,\"enable_adaptive_sink_dop\":true,\"consistent_hash_virtual_number\":32,\"enable_profile\":false,\"load_mem_limit\":0,\"sql_safe_updates\":0,\"runtime_filter_early_return_selectivity\":0.05,\"enable_local_shuffle_agg\":true,\"disable_function_fold_constants\":false,\"select_ratio_threshold\":0.15,\"query_delivery_timeout\":300,\"collation_database\":\"utf8_general_ci\",\"spill_mem_table_size\":104857600,\"cbo_use_lock_db\":false,\"new_planner_agg_stage\":0,\"use_compute_nodes\":-1,\"collation_connection\":\"utf8_general_ci\",\"resource_group\":\"\",\"profile_limit_fold\":true,\"spill_operator_max_bytes\":1048576000,\"cbo_max_reorder_node_use_dp\":10,\"enable_hive_column_stats\":true,\"enable_groupby_use_output_alias\":false,\"forward_to_leader\":false,\"count_distinct_column_buckets\":1024,\"query_cache_agg_cardinality_limit\":5000000,\"enable_pipeline_query_statistic\":true,\"cboPushDownAggregateMode_v1\":-1,\"window_partition_mode\":1,\"enable_deliver_batch_fragments\":true,\"enable_tablet_internal_parallel_v2\":true,\"interpolate_passthrough\":true,\"enable_incremental_mv\":false,\"SQL_AUTO_IS_NULL\":false,\"event_scheduler\":\"OFF\",\"max_pipeline_dop\":64,\"broadcast_right_table_scale_factor\":10,\"materialized_view_rewrite_mode\":\"DEFAULT\",\"enable_simplify_case_when\":true,\"runtime_join_filter_push_down_limit\":1024000,\"big_query_log_cpu_second_threshold\":480,\"div_precision_increment\":4,\"runtime_adaptive_dop_max_block_rows_per_driver_seq\":16384,\"log_rejected_record_num\":0,\"cbo_push_down_distinct_below_window\":true,\"sql_mode_v2\":32,\"prefer_cte_rewrite\":false,\"hdfs_backend_selector_scan_range_shuffle\":false,\"pipeline_profile_level\":1,\"parallel_fragment_exec_instance_num\":1,\"max_scan_key_num\":-1,\"net_read_timeout\":60,\"streaming_preaggregation_mode\":\"auto\",\"hive_partition_stats_sample_size\":3000,\"enable_mv_planner\":false,\"enable_collect_table_level_scan_stats\":true,\"profile_timeout\":2,\"cbo_push_down_aggregate\":\"global\",\"spill_encode_level\":7,\"enable_query_dump\":false,\"global_runtime_filter_build_max_size\":67108864,\"enable_rewrite_sum_by_associative_rule\":true,\"query_cache_hot_partition_num\":3,\"enable_prune_complex_types\":true,\"query_cache_type\":0,\"max_parallel_scan_instance_num\":4,\"query_cache_entry_max_rows\":409600,\"enable_mv_optimizer_trace_log\":false,\"connector_io_tasks_per_scan_operator\":16,\"enable_materialized_view_union_rewrite\":true,\"sql_quote_show_create\":true,\"scan_or_to_union_threshold\":50000000,\"enable_exchange_pass_through\":true,\"runtime_profile_report_interval\":10,\"query_cache_entry_max_bytes\":4194304,\"enable_exchange_perf\":false,\"workgroup_id\":0,\"enable_rewrite_groupingsets_to_union_all\":false,\"transmission_compression_type\":\"NO_COMPRESSION\",\"interactive_timeout\":3600,\"use_page_cache\":true,\"big_query_log_scan_bytes_threshold\":10737418240,\"collation_server\":\"utf8_general_ci\",\"tablet_internal_parallel_mode\":\"auto\",\"enable_pipeline\":false,\"spill_mode\":\"auto\",\"enable_query_debug_trace\":false,\"enable_show_all_variables\":false,\"full_sort_max_buffered_bytes\":16777216,\"wait_timeout\":28800,\"transmission_encode_level\":7,\"query_including_mv_names\":\"\",\"transaction_isolation\":\"REPEATABLE-READ\",\"enable_global_runtime_filter\":true,\"enable_load_profile\":false,\"enable_plan_validation\":true,\"load_transmission_compression_type\":\"NO_COMPRESSION\",\"cbo_enable_low_cardinality_optimize\":true,\"scan_use_query_mem_ratio\":0.3,\"new_planner_optimize_timeout\":10000,\"enable_outer_join_reorder\":true,\"force_schedule_local\":false,\"hudi_mor_force_jni_reader\":false,\"full_sort_late_materialization\":false,\"cbo_enable_greedy_join_reorder\":true,\"range_pruner_max_predicate\":100,\"enable_rbo_table_prune\":false,\"spillable_operator_mask\":-1,\"rpc_http_min_size\":2147482624,\"cbo_debug_alive_backend_number\":0,\"global_runtime_filter_probe_min_size\":102400,\"scan_or_to_union_limit\":4,\"enable_cbo_table_prune\":false,\"enable_parallel_merge\":true,\"nested_mv_rewrite_max_level\":3,\"net_write_timeout\":60,\"cbo_prune_shuffle_column_rate\":0.1,\"hash_join_push_down_right_table\":true,\"pipeline_sink_dop\":0,\"broadcast_row_limit\":15000000,\"enable_populate_block_cache\":true,\"exec_mem_limit\":6442450944,\"enable_sort_aggregate\":false,\"query_cache_force_populate\":false,\"runtime_filter_on_exchange_node\":false,\"disable_join_reorder\":false,\"enable_rule_based_materialized_view_rewrite\":true,\"connector_scan_use_query_mem_ratio\":0.3,\"net_buffer_length\":16384,\"cbo_prune_subfield\":true,\"full_sort_max_buffered_rows\":1024000,\"query_timeout\":28800,\"connector_io_tasks_slow_io_latency_ms\":50,\"cbo_max_reorder_node\":50,\"enable_distinct_column_bucketization\":false,\"enable_big_query_log\":true,\"analyze_mv\":\"sample\",\"runtime_filter_scan_wait_time\":20,\"enable_sync_materialized_view_rewrite\":true,\"prefer_compute_node\":false,\"enable_strict_type\":false,\"group_concat_max_len\":65535,\"parse_tokens_limit\":3500000,\"chunk_size\":4096,\"global_runtime_filter_probe_min_selectivity\":0.5,\"query_mem_limit\":0,\"enable_filter_unused_columns_in_scan_stage\":true,\"enable_scan_block_cache\":false,\"enable_materialized_view_single_table_view_delta_rewrite\":false,\"auto_increment_increment\":1,\"sql_dialect\":\"StarRocks\",\"big_query_log_scan_rows_threshold\":1000000000,\"character_set_client\":\"utf8\",\"autocommit\":true,\"enable_column_expr_predicate\":true,\"enable_runtime_adaptive_dop\":false,\"cbo_cte_max_limit\":10,\"storage_engine\":\"olap\",\"enable_optimizer_trace_log\":false,\"spill_operator_min_bytes\":10485760,\"cbo_enable_dp_join_reorder\":true,\"tx_visible_wait_timeout\":10,\"enable_materialized_view_view_delta_rewrite\":true,\"cbo_max_reorder_node_use_exhaustive\":4,\"enable_sql_digest\":false,\"spill_mem_table_num\":2,\"enable_spill\":false,\"pipeline_dop\":0,\"single_node_exec_plan\":false,\"join_implementation_mode_v2\":\"auto\",\"sql_select_limit\":9223372036854775807,\"enable_materialized_view_rewrite\":true,\"statistic_collect_parallel\":1,\"hdfs_backend_selector_hash_algorithm\":\"consistent\",\"disable_colocate_join\":false,\"max_pushdown_conditions_per_column\":-1,\"default_table_compression\":\"lz4_frame\",\"runtime_adaptive_dop_max_output_amplification_factor\":0,\"innodb_read_only\":true,\"spill_mem_limit_threshold\":0.5,\"cbo_reorder_threshold_use_exhaustive\":6,\"enable_predicate_reorder\":false,\"enable_query_cache\":false,\"max_allowed_packet\":33554432,\"time_zone\":\"Asia/Shanghai\",\"enable_multicolumn_global_runtime_filter\":false,\"character_set_server\":\"utf8\",\"cbo_use_nth_exec_plan\":0,\"io_tasks_per_scan_operator\":4,\"parallel_exchange_instance_num\":-1,\"enable_shared_scan\":false,\"allow_default_partition\":false}","column_statistics":{},"be_number":3,"be_core_stat":{"numOfHardwareCoresPerBe":"{\"10007\":48,\"10077\":30,\"23822\":16}","cachedAvgNumOfHardwareCores":-1},"exception":[],"version":"3.1.2","commit_version":"4f3a2ee"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
-- name: test_push_down_distinct_agg_across_window
DROP TABLE if exists t0;
-- result:
-- !result
CREATE TABLE if not exists t0
(
TIME VARCHAR(30) NOT NULL,
NUM BIGINT NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`TIME`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`TIME`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "default"
);
-- result:
-- !result
INSERT INTO t0
(TIME, NUM)
VALUES
('2022-01-02', '1512'),
('2022-01-03', '-322850'),
('2022-01-03', '-9136270383122058721'),
('2022-01-04', '-9223372036854775808'),
('2022-01-01', '-411260301'),
('2022-01-05', '-814552904955694743'),
('2022-01-03', '-14033619422102'),
('2022-01-04', '2719391845278'),
('2022-01-04', '328803617'),
('2022-01-04', '-2757055605405418'),
('2022-01-01', '-17017206034'),
('2022-01-01', '44644742'),
('2022-01-05', '1389488975808660'),
('2022-01-05', '-2'),
('2022-01-04', '-3'),
('2022-01-05', '-9223372036854775808'),
('2022-01-04', '1175288988'),
('2022-01-02', '6990590275516343015'),
('2022-01-05', '-18'),
('2022-01-05', '-380919416214054');
-- result:
-- !result
set cbo_push_down_distinct_below_window='false';
-- result:
-- !result
with temp as (
select TIME, ceil(round(sum(`NUM`) OVER (PARTITION BY `TIME` ))) as N from t0
) select sum(murmur_hash3_32(TIME) + murmur_hash3_32(N)) as fingerprint from(select distinct TIME, N from temp)t1;
-- result:
-5805963690
-- !result
set cbo_push_down_distinct_below_window='true';
-- result:
-- !result
with temp as (
select TIME, ceil(round(sum(`NUM`) OVER (PARTITION BY `TIME` ))) as N from t0
) select sum(murmur_hash3_32(TIME) + murmur_hash3_32(N)) as fingerprint from(select distinct TIME, N from temp)t1;
-- result:
-5805963690
-- !result
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
-- name: test_push_down_distinct_agg_across_window
DROP TABLE if exists t0;

CREATE TABLE if not exists t0
(
TIME VARCHAR(30) NOT NULL,
NUM BIGINT NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`TIME`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`TIME`) BUCKETS 1
PROPERTIES(
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "default"
);
INSERT INTO t0
(TIME, NUM)
VALUES
('2022-01-02', '1512'),
('2022-01-03', '-322850'),
('2022-01-03', '-9136270383122058721'),
('2022-01-04', '-9223372036854775808'),
('2022-01-01', '-411260301'),
('2022-01-05', '-814552904955694743'),
('2022-01-03', '-14033619422102'),
('2022-01-04', '2719391845278'),
('2022-01-04', '328803617'),
('2022-01-04', '-2757055605405418'),
('2022-01-01', '-17017206034'),
('2022-01-01', '44644742'),
('2022-01-05', '1389488975808660'),
('2022-01-05', '-2'),
('2022-01-04', '-3'),
('2022-01-05', '-9223372036854775808'),
('2022-01-04', '1175288988'),
('2022-01-02', '6990590275516343015'),
('2022-01-05', '-18'),
('2022-01-05', '-380919416214054');
set cbo_push_down_distinct_below_window='false';
with temp as (
select TIME, ceil(round(sum(`NUM`) OVER (PARTITION BY `TIME` ))) as N from t0
) select sum(murmur_hash3_32(TIME) + murmur_hash3_32(N)) as fingerprint from(select distinct TIME, N from temp)t1;
set cbo_push_down_distinct_below_window='true';
with temp as (
select TIME, ceil(round(sum(`NUM`) OVER (PARTITION BY `TIME` ))) as N from t0
) select sum(murmur_hash3_32(TIME) + murmur_hash3_32(N)) as fingerprint from(select distinct TIME, N from temp)t1;

0 comments on commit 8f2ea2a

Please sign in to comment.