From ecf43f43e48f4342e627dc128803a6f4f8537f6e Mon Sep 17 00:00:00 2001 From: nap_all_day <44193366+napallday@users.noreply.github.com> Date: Tue, 25 Jul 2023 20:35:16 +0800 Subject: [PATCH] fix: concurrent issue on updateMetaDataMs (#2522) Signed-off-by: napallday --- client.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/client.go b/client.go index 4182c33f2..d9fb77d64 100644 --- a/client.go +++ b/client.go @@ -132,10 +132,10 @@ const ( ) type client struct { - // updateMetaDataMs stores the time at which metadata was lasted updated. + // updateMetadataMs stores the time at which metadata was lasted updated. // Note: this accessed atomically so must be the first word in the struct // as per golang/go#41970 - updateMetaDataMs int64 + updateMetadataMs int64 conf *Config closer, closed chan none // for shutting down background metadata updater @@ -975,8 +975,8 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, time.Sleep(backoff) } - t := atomic.LoadInt64(&client.updateMetaDataMs) - if time.Since(time.Unix(t/1e3, 0)) < backoff { + t := atomic.LoadInt64(&client.updateMetadataMs) + if time.Since(time.UnixMilli(t)) < backoff { return err } attemptsRemaining-- @@ -1000,10 +1000,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, req := NewMetadataRequest(client.conf.Version, topics) req.AllowAutoTopicCreation = allowAutoTopicCreation - t := atomic.LoadInt64(&client.updateMetaDataMs) - if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) { - return nil - } + atomic.StoreInt64(&client.updateMetadataMs, time.Now().UnixMilli()) response, err := broker.GetMetadata(req) var kerror KError