From c6d9d39ab1c6646807d84ac3eb2a4d1d065c885b Mon Sep 17 00:00:00 2001 From: shacheng Date: Tue, 5 Mar 2024 10:40:24 +0800 Subject: [PATCH] The producer can be easily blocked due to a race condition in Broker.throttleTimer, which may result in a panic. add throttleTimerLock to protect Fixes #2823 Signed-off-by: chengsha --- broker.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/broker.go b/broker.go index 268696cf4..f86a70897 100644 --- a/broker.go +++ b/broker.go @@ -59,7 +59,8 @@ type Broker struct { kerberosAuthenticator GSSAPIKerberosAuth clientSessionReauthenticationTimeMs int64 - throttleTimer *time.Timer + throttleTimer *time.Timer + throttleTimerLock sync.Mutex } // SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker @@ -86,6 +87,10 @@ const ( SASLExtKeyAuth = "auth" ) +var ( + emptyTimer = time.NewTimer(time.Duration(0)) +) + // AccessToken contains an access token used to authenticate a // SASL/OAUTHBEARER client along with associated metadata. type AccessToken struct { @@ -1697,6 +1702,8 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) { } func (b *Broker) setThrottle(throttleTime time.Duration) { + b.throttleTimerLock.Lock() + defer b.throttleTimerLock.Unlock() if b.throttleTimer != nil { // if there is an existing timer stop/clear it if !b.throttleTimer.Stop() { @@ -1707,6 +1714,8 @@ func (b *Broker) setThrottle(throttleTime time.Duration) { } func (b *Broker) waitIfThrottled() { + b.throttleTimerLock.Lock() + defer b.throttleTimerLock.Unlock() if b.throttleTimer != nil { DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID()) <-b.throttleTimer.C