Skip to content

Commit

Permalink
fix(pubsub): fix out of order issue when exactly once is enabled (#9472)
Browse files Browse the repository at this point in the history
* fix(pubsub): fix out of ordering issue with exactly once

* add ordering test and fix race condition with resource cleanup

* remove TODO comment
  • Loading branch information
hongalex authored Feb 26, 2024
1 parent 92e7b7f commit e89fd6c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 20 deletions.
124 changes: 106 additions & 18 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Fatalf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Fatalf("timed out after 30s waiting for item %d", i)
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}
}
Expand Down Expand Up @@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
}
}

func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
ctx := context.Background()
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
defer client.Close()

topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("topic %v should exist, but it doesn't", topic)
}
var sub *Subscription
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
Topic: topic,
EnableMessageOrdering: true,
EnableExactlyOnceDelivery: true,
}); err != nil {
t.Fatal(err)
}
defer sub.Delete(ctx)
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
}

topic.PublishSettings.DelayThreshold = time.Second
topic.EnableMessageOrdering = true

orderingKey := "some-ordering-key"
numItems := 10
for i := 0; i < numItems; i++ {
r := topic.Publish(ctx, &Message{
ID: fmt.Sprintf("id-%d", i),
Data: []byte(fmt.Sprintf("item-%d", i)),
OrderingKey: orderingKey,
})
go func() {
if _, err := r.Get(ctx); err != nil {
t.Error(err)
}
}()
}

received := make(chan string, numItems)
ctx, cancel := context.WithCancel(ctx)
go func() {
for i := 0; i < numItems; i++ {
select {
case r := <-received:
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
t.Errorf("%d: got %s, want %s", i, got, want)
}
case <-time.After(30 * time.Second):
t.Errorf("timed out after 30s waiting for item %d", i)
cancel()
}
}
cancel()
}()

if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
defer msg.Ack()
if msg.OrderingKey != orderingKey {
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
}

received <- string(msg.Data)
}); err != nil {
if c := status.Code(err); c != codes.Canceled {
t.Error(err)
}
}

}

func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down
8 changes: 6 additions & 2 deletions pubsub/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
}
}
// Only return for processing messages that were successfully modack'ed.
// Iterate over the original messages slice for ordering.
v := make([]*ipubsub.Message, 0, len(pendingMessages))
for _, m := range pendingMessages {
v = append(v, m)
for _, m := range msgs {
ackID := msgAckID(m)
if _, ok := pendingMessages[ackID]; ok {
v = append(v, m)
}
}
return v, nil
}
Expand Down

0 comments on commit e89fd6c

Please sign in to comment.