diff --git a/src/connector/src/connector_common/mqtt_common.rs b/src/connector/src/connector_common/mqtt_common.rs index c967cf215fd2d..3747166ee88e0 100644 --- a/src/connector/src/connector_common/mqtt_common.rs +++ b/src/connector/src/connector_common/mqtt_common.rs @@ -15,6 +15,7 @@ use rumqttc::tokio_rustls::rustls; use rumqttc::v5::mqttbytes::QoS; use rumqttc::v5::{AsyncClient, EventLoop, MqttOptions}; +use rumqttc::v5::mqttbytes::v5::ConnectProperties; use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use strum_macros::{Display, EnumString}; @@ -71,6 +72,10 @@ pub struct MqttCommon { #[serde_as(as = "Option")] pub inflight_messages: Option, + /// The max size of messages received by the MQTT client + #[serde_as(as = "Option")] + pub max_packet_size: Option, + /// Path to CA certificate file for verifying the broker's key. #[serde(rename = "tls.client_key")] pub ca: Option, @@ -111,6 +116,14 @@ impl MqttCommon { options.set_clean_start(self.clean_start); + if let Some(max_packet_size) = self.max_packet_size{ + options.set_connect_properties({ + let mut props = ConnectProperties::new(); + props.max_packet_size = Some(max_packet_size); + props + }); + } + if ssl { let tls_config = self.get_tls_config()?; options.set_transport(rumqttc::Transport::tls_with_config( diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 1af3435eaea24..72e800e82a48b 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -717,6 +717,10 @@ MqttConfig: field_type: usize comments: The maximum number of inflight messages. Defaults to 100 required: false + - name: max_packet_size + field_type: u32 + comments: The max size of messages received by the MQTT client + required: false - name: tls.client_key field_type: String comments: Path to CA certificate file for verifying the broker's key. diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index c54dce97ad1cd..afa387b48811a 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -493,6 +493,10 @@ MqttProperties: field_type: usize comments: The maximum number of inflight messages. Defaults to 100 required: false + - name: max_packet_size + field_type: u32 + comments: The max size of messages received by the MQTT client + required: false - name: tls.client_key field_type: String comments: Path to CA certificate file for verifying the broker's key.