Skip to content

Commit

Permalink
feat: new settings stream_consume_batch_size_hint (#17102)
Browse files Browse the repository at this point in the history
* feat: new settings `stream_consume_batch_size`

* logic tests

* more logic test
  • Loading branch information
dantengsky authored Dec 24, 2024
1 parent ee3e395 commit cee1496
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 2 deletions.
26 changes: 24 additions & 2 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1046,7 +1046,8 @@ impl TableContext for QueryContext {
database: &str,
table: &str,
) -> Result<Arc<dyn Table>> {
self.get_table_from_shared(catalog, database, table, None)
let batch_size = self.get_settings().get_stream_consume_batch_size_hint()?;
self.get_table_from_shared(catalog, database, table, batch_size)
.await
}

Expand All @@ -1062,6 +1063,23 @@ impl TableContext for QueryContext {
table: &str,
max_batch_size: Option<u64>,
) -> Result<Arc<dyn Table>> {
let max_batch_size = {
match max_batch_size {
Some(v) => {
// use the batch size specified in the statement
Some(v)
}
None => {
if let Some(v) = self.get_settings().get_stream_consume_batch_size_hint()? {
info!("overriding max_batch_size of stream consumption using value specified in setting: {}", v);
Some(v)
} else {
None
}
}
}
};

let table = self
.get_table_from_shared(catalog, database, table, max_batch_size)
.await?;
Expand All @@ -1070,7 +1088,11 @@ impl TableContext for QueryContext {
let actual_batch_limit = stream.max_batch_size();
if actual_batch_limit != max_batch_size {
return Err(ErrorCode::StorageUnsupported(
"Within the same transaction, the batch size for a stream must remain consistent",
format!(
"Within the same transaction, the batch size for a stream must remain consistent {:?} {:?}",
actual_batch_limit, max_batch_size
)

));
}
} else if max_batch_size.is_some() {
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,13 @@ impl DefaultSettings {
scope: SettingScope::Global,
range: None,
}),
("stream_consume_batch_size_hint", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Hint for batch size during stream consumption. Set it to 0 to disable it. Larger values may improve throughput but could impose greater pressure on stream consumers.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=u64::MAX)),
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
5 changes: 5 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,4 +843,9 @@ impl Settings {
pub fn get_network_policy(&self) -> Result<String> {
self.try_get_string("network_policy")
}

pub fn get_stream_consume_batch_size_hint(&self) -> Result<Option<u64>> {
let v = self.try_get_u64("stream_consume_batch_size_hint")?;
Ok(if v == 0 { None } else { Some(v) })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,72 @@ select a from s order by a;
statement error 2735
select a from s with (xx = 2);

#################################################
# test setting `stream_consume_batch_size_hint` #
#################################################

statement ok
create table t_settings (c int);

statement ok
CREATE STREAM s_t_settings ON TABLE t_settings;

statement ok
INSERT INTO t_settings values(1);

statement ok
INSERT INTO t_settings values(2);

statement ok
INSERT INTO t_settings values(3);

#####################################
# expect 3 rows, without batch size #
#####################################
query I
select c from s_t_settings order by c;
----
1
2
3

############################################
# set max_batch_size using session setting #
# expects 2 rows #
############################################

statement ok
set stream_consume_batch_size_hint = 2;

query I
select c from s_t_settings order by c;
----
1
2

#######################################################
# max_batch_size specified in query has high priority #
#######################################################

query I
select c from s_t_settings with (max_batch_size = 1);
----
1

##################################
# set hint to 0, will disable it #
##################################

statement ok
set stream_consume_batch_size_hint = 0;

# expects 3 rows
query I
select c from s_t_settings order by c;
----
1
2
3


statement ok
Expand Down

0 comments on commit cee1496

Please sign in to comment.