Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Fix a possible race that uses goroutines on loop iterator variables.
Browse files Browse the repository at this point in the history
Within a loop "for k, v := range m", variables k and v are shared by all the loop iterations, as described in the Go specification (https://golang.org/ref/spec):

"The iteration variables may be declared by the "range" clause using a form of short variable declaration (:=). In this case their types are set to the types of the respective iteration values and their scope is the block of the "for" statement; they are re-used in each iteration. If the iteration variables are declared outside the "for" statement, after execution their values will be those of the last iteration."

In the related code, variable "sub" is shared by all the loop iterations. All the go routines within the loop may refer to a non-deterministic value (e.g. the last value updated by the loop), e.g., all sub.GetName() calls refer to the same subscription, and return the same name.

for _, sub := range lister.c.GetSubscription() {
  ...
  go s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
    ...
    lister.cache[sub.GetName()][msg.Attributes["name"]] = msg.PublishTime
  }()
}

More information on the bug can be found at https://github.com/golang/go/wiki/CommonMistakes#using-goroutines-on-loop-iterator-variables.

Found by #deepgo.

PiperOrigin-RevId: 322458151
  • Loading branch information
guodongli-google authored and manugarg committed Jul 22, 2020
1 parent 02c9fbb commit 5715f21
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions rds/gcp/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,17 @@ func newPubSubMsgsLister(project string, c *configpb.PubSubMessages, l *logger.L
return nil, err
}

lister.subs[sub.GetName()] = s
lister.cache[sub.GetName()] = make(map[string]time.Time)
name := sub.GetName()
lister.subs[name] = s
lister.cache[name] = make(map[string]time.Time)

lister.l.Infof("pubsub: Receiving pub/sub messages for project (%s) and subscription (%s)", lister.project, sub.GetName())
lister.l.Infof("pubsub: Receiving pub/sub messages for project (%s) and subscription (%s)", lister.project, name)
go s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
lister.mu.Lock()
defer lister.mu.Unlock()

lister.l.Infof("pubsub: Adding message with name: %s, message id: %s, publish time: %s", msg.Attributes["name"], msg.ID, msg.PublishTime)
lister.cache[sub.GetName()][msg.Attributes["name"]] = msg.PublishTime
lister.cache[name][msg.Attributes["name"]] = msg.PublishTime
msg.Ack()
})
}
Expand Down

0 comments on commit 5715f21

Please sign in to comment.