diff --git a/options.go b/options.go index b87b503..5aaa7d9 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,7 @@ type ClientOptions struct { MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer CustomOpenConnectionFn OpenConnectionFunc + AutoAckDisabled bool } // NewClientOptions will create a new ClientClientOptions type with some @@ -147,6 +148,7 @@ func NewClientOptions() *ClientOptions { WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, CustomOpenConnectionFn: nil, + AutoAckDisabled: false, } return o } @@ -446,3 +448,10 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon } return o } + +// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler. +// By default it is set to false. Setting it to true will disable the auto-ack globally. +func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions { + o.AutoAckDisabled = autoAckDisabled + return o +} diff --git a/router.go b/router.go index bfaef61..bd05a0c 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } wg.Done() }() } @@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } wg.Done() }() } @@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - m.Ack() + if !client.options.AutoAckDisabled { + m.Ack() + } } // DEBUG.Println(ROU, "matchAndDispatch handled message") }