From cee1496f5d83e59c3402861a8dfd7ed1b65fadb3 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Tue, 24 Dec 2024 14:56:02 +0800 Subject: [PATCH] feat: new settings `stream_consume_batch_size_hint` (#17102) * feat: new settings `stream_consume_batch_size` * logic tests * more logic test --- src/query/service/src/sessions/query_ctx.rs | 26 +++++++- src/query/settings/src/settings_default.rs | 7 ++ .../settings/src/settings_getter_setter.rs | 5 ++ .../06_0006_stream_batch_limit.test | 66 +++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index c372651b9dd75..c1a45845bbdcc 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1046,7 +1046,8 @@ impl TableContext for QueryContext { database: &str, table: &str, ) -> Result> { - self.get_table_from_shared(catalog, database, table, None) + let batch_size = self.get_settings().get_stream_consume_batch_size_hint()?; + self.get_table_from_shared(catalog, database, table, batch_size) .await } @@ -1062,6 +1063,23 @@ impl TableContext for QueryContext { table: &str, max_batch_size: Option, ) -> Result> { + let max_batch_size = { + match max_batch_size { + Some(v) => { + // use the batch size specified in the statement + Some(v) + } + None => { + if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? { + info!("overriding max_batch_size of stream consumption using value specified in setting: {}", v); + Some(v) + } else { + None + } + } + } + }; + let table = self .get_table_from_shared(catalog, database, table, max_batch_size) .await?; @@ -1070,7 +1088,11 @@ impl TableContext for QueryContext { let actual_batch_limit = stream.max_batch_size(); if actual_batch_limit != max_batch_size { return Err(ErrorCode::StorageUnsupported( - "Within the same transaction, the batch size for a stream must remain consistent", + format!( + "Within the same transaction, the batch size for a stream must remain consistent {:?} {:?}", + actual_batch_limit, max_batch_size + ) + )); } } else if max_batch_size.is_some() { diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 6ddf21eede6df..b895d7ffd65d7 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1144,6 +1144,13 @@ impl DefaultSettings { scope: SettingScope::Global, range: None, }), + ("stream_consume_batch_size_hint", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Hint for batch size during stream consumption. Set it to 0 to disable it. Larger values may improve throughput but could impose greater pressure on stream consumers.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=u64::MAX)), + }), ]); Ok(Arc::new(DefaultSettings { diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 29b3088f3fc0a..21d12dbb56fcb 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -843,4 +843,9 @@ impl Settings { pub fn get_network_policy(&self) -> Result { self.try_get_string("network_policy") } + + pub fn get_stream_consume_batch_size_hint(&self) -> Result> { + let v = self.try_get_u64("stream_consume_batch_size_hint")?; + Ok(if v == 0 { None } else { Some(v) }) + } } diff --git a/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test b/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test index 423ef5dbd69fb..956f1d84eecf9 100644 --- a/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test +++ b/tests/sqllogictests/suites/ee/06_ee_stream/06_0006_stream_batch_limit.test @@ -282,6 +282,72 @@ select a from s order by a; statement error 2735 select a from s with (xx = 2); +################################################# +# test setting `stream_consume_batch_size_hint` # +################################################# + +statement ok +create table t_settings (c int); + +statement ok +CREATE STREAM s_t_settings ON TABLE t_settings; + +statement ok +INSERT INTO t_settings values(1); + +statement ok +INSERT INTO t_settings values(2); + +statement ok +INSERT INTO t_settings values(3); + +##################################### +# expect 3 rows, without batch size # +##################################### +query I +select c from s_t_settings order by c; +---- +1 +2 +3 + +############################################ +# set max_batch_size using session setting # +# expects 2 rows # +############################################ + +statement ok +set stream_consume_batch_size_hint = 2; + +query I +select c from s_t_settings order by c; +---- +1 +2 + +####################################################### +# max_batch_size specified in query has high priority # +####################################################### + +query I +select c from s_t_settings with (max_batch_size = 1); +---- +1 + +################################## +# set hint to 0, will disable it # +################################## + +statement ok +set stream_consume_batch_size_hint = 0; + +# expects 3 rows +query I +select c from s_t_settings order by c; +---- +1 +2 +3 statement ok