Skip to content

Commit

Permalink
Add max_packet_size option to MQTT Source
Browse files Browse the repository at this point in the history
  • Loading branch information
florissmit10 committed Sep 12, 2024
1 parent 9533cd9 commit 54addf0
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/connector/src/connector_common/mqtt_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use rumqttc::tokio_rustls::rustls;
use rumqttc::v5::mqttbytes::QoS;
use rumqttc::v5::{AsyncClient, EventLoop, MqttOptions};
use rumqttc::v5::mqttbytes::v5::ConnectProperties;
use serde_derive::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use strum_macros::{Display, EnumString};
Expand Down Expand Up @@ -71,6 +72,10 @@ pub struct MqttCommon {
#[serde_as(as = "Option<DisplayFromStr>")]
pub inflight_messages: Option<usize>,

/// The max size of messages received by the MQTT client
#[serde_as(as = "Option<DisplayFromStr>")]
pub max_packet_size: Option<u32>,

/// Path to CA certificate file for verifying the broker's key.
#[serde(rename = "tls.client_key")]
pub ca: Option<String>,
Expand Down Expand Up @@ -111,6 +116,14 @@ impl MqttCommon {

options.set_clean_start(self.clean_start);

if let Some(max_packet_size) = self.max_packet_size{
options.set_connect_properties({
let mut props = ConnectProperties::new();
props.max_packet_size = Some(max_packet_size);
props
});
}

if ssl {
let tls_config = self.get_tls_config()?;
options.set_transport(rumqttc::Transport::tls_with_config(
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,10 @@ MqttConfig:
field_type: usize
comments: The maximum number of inflight messages. Defaults to 100
required: false
- name: max_packet_size
field_type: u32
comments: The max size of messages received by the MQTT client
required: false
- name: tls.client_key
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
Expand Down
4 changes: 4 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ MqttProperties:
field_type: usize
comments: The maximum number of inflight messages. Defaults to 100
required: false
- name: max_packet_size
field_type: u32
comments: The max size of messages received by the MQTT client
required: false
- name: tls.client_key
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
Expand Down

0 comments on commit 54addf0

Please sign in to comment.