From 5715f21f7551f84b6e89b0dc17db1faafe0523dd Mon Sep 17 00:00:00 2001 From: Guodong Li Date: Tue, 21 Jul 2020 15:42:14 -0700 Subject: [PATCH] Fix a possible race that uses goroutines on loop iterator variables. Within a loop "for k, v := range m", variables k and v are shared by all the loop iterations, as described in the Go specification (https://golang.org/ref/spec): "The iteration variables may be declared by the "range" clause using a form of short variable declaration (:=). In this case their types are set to the types of the respective iteration values and their scope is the block of the "for" statement; they are re-used in each iteration. If the iteration variables are declared outside the "for" statement, after execution their values will be those of the last iteration." In the related code, variable "sub" is shared by all the loop iterations. All the go routines within the loop may refer to a non-deterministic value (e.g. the last value updated by the loop), e.g., all sub.GetName() calls refer to the same subscription, and return the same name. for _, sub := range lister.c.GetSubscription() { ... go s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { ... lister.cache[sub.GetName()][msg.Attributes["name"]] = msg.PublishTime }() } More information on the bug can be found at https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables. Found by #deepgo. PiperOrigin-RevId: 322458151 --- rds/gcp/pubsub.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rds/gcp/pubsub.go b/rds/gcp/pubsub.go index 4bd5032f..745cce9e 100644 --- a/rds/gcp/pubsub.go +++ b/rds/gcp/pubsub.go @@ -185,16 +185,17 @@ func newPubSubMsgsLister(project string, c *configpb.PubSubMessages, l *logger.L return nil, err } - lister.subs[sub.GetName()] = s - lister.cache[sub.GetName()] = make(map[string]time.Time) + name := sub.GetName() + lister.subs[name] = s + lister.cache[name] = make(map[string]time.Time) - lister.l.Infof("pubsub: Receiving pub/sub messages for project (%s) and subscription (%s)", lister.project, sub.GetName()) + lister.l.Infof("pubsub: Receiving pub/sub messages for project (%s) and subscription (%s)", lister.project, name) go s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { lister.mu.Lock() defer lister.mu.Unlock() lister.l.Infof("pubsub: Adding message with name: %s, message id: %s, publish time: %s", msg.Attributes["name"], msg.ID, msg.PublishTime) - lister.cache[sub.GetName()][msg.Attributes["name"]] = msg.PublishTime + lister.cache[name][msg.Attributes["name"]] = msg.PublishTime msg.Ack() }) }