Skip to content

Commit

Permalink
fix(pubsub/pstest): Clear Subscription when calling ClearMessages.
Browse files Browse the repository at this point in the history
Currently, `ClearMessages()` only clears `Server` messages, not touching any messages that have been published to a Subscription but not yet delivered.  This can cause odd behaviour if those messages are delivered as `pstest` assumes that any such messages exist in the Server's `msgsByID` map.  Attempting to ModAck such a message results in a NPE.
  • Loading branch information
mikeklaas authored and hongalex committed May 2, 2023
1 parent 036656b commit 6de8eda
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 4 deletions.
3 changes: 3 additions & 0 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func (s *Server) ClearMessages() {
s.GServer.mu.Lock()
s.GServer.msgs = nil
s.GServer.msgsByID = make(map[string]*Message)
for _, sub := range s.GServer.subs {
sub.msgs = map[string]*message{}
}
s.GServer.mu.Unlock()
}

Expand Down
22 changes: 18 additions & 4 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,13 +472,19 @@ func TestPublishOrdered(t *testing.T) {
}

func TestClearMessages(t *testing.T) {
s := NewServer()
defer s.Close()
pclient, sclient, s, cleanup := newFake(context.TODO(), t)
defer cleanup()

top := mustCreateTopic(context.TODO(), t, pclient, &pb.Topic{Name: "projects/P/topics/T"})
sub := mustCreateSubscription(context.TODO(), t, sclient, &pb.Subscription{
Name: "projects/P/subscriptions/S",
Topic: top.Name,
AckDeadlineSeconds: 10,
})

for i := 0; i < 3; i++ {
s.Publish("projects/p/topics/t", []byte("hello"), nil)
s.Publish(top.Name, []byte("hello"), nil)
}
s.Wait()
msgs := s.Messages()
if got, want := len(msgs), 3; got != want {
t.Errorf("got %d messages, want %d", got, want)
Expand All @@ -488,6 +494,14 @@ func TestClearMessages(t *testing.T) {
if got, want := len(msgs), 0; got != want {
t.Errorf("got %d messages, want %d", got, want)
}

res, err := sclient.Pull(context.Background(), &pb.PullRequest{Subscription: sub.Name})
if err != nil {
t.Fatal(err)
}
if len(res.ReceivedMessages) != 0 {
t.Errorf("got %d messages, want zero", len(res.ReceivedMessages))
}
}

// Note: this sets the fake's "now" time, so it is sensitive to concurrent changes to "now".
Expand Down

0 comments on commit 6de8eda

Please sign in to comment.