Skip to content

Commit

Permalink
[ADDED] Drain for jetstream consume methods (#1515)
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Jan 10, 2024
1 parent ba22476 commit 44fac59
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 50 deletions.
3 changes: 3 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ issues:
- linters:
- errcheck
text: "Unsubscribe"
- linters:
- errcheck
text: "Drain"
- linters:
- errcheck
text: "msg.Ack"
Expand Down
14 changes: 7 additions & 7 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.2
github.com/nats-io/nats-server/v2 v2.10.0
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nkeys v0.4.6
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.13.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
29 changes: 14 additions & 15 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -26,16 +26,15 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
18 changes: 18 additions & 0 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type (
consumer *orderedConsumer
opts []PullMessagesOpt
done chan struct{}
closed uint32
}

cursor struct {
Expand Down Expand Up @@ -298,6 +299,9 @@ func (s *orderedSubscription) Next() (Msg, error) {
}

func (s *orderedSubscription) Stop() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
sub, ok := s.consumer.currentConsumer.getSubscription("")
if !ok {
return
Expand All @@ -308,6 +312,20 @@ func (s *orderedSubscription) Stop() {
close(s.done)
}

func (s *orderedSubscription) Drain() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
sub, ok := s.consumer.currentConsumer.getSubscription("")
if !ok {
return
}
s.consumer.currentConsumer.Lock()
defer s.consumer.currentConsumer.Unlock()
sub.Drain()
close(s.done)
}

// Fetch is used to retrieve up to a provided number of messages from a stream.
// This method will always send a single request and wait until either all messages are retrieved
// or context reaches its deadline.
Expand Down
121 changes: 97 additions & 24 deletions jetstream/pull.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -30,14 +30,31 @@ import (
type (
// MessagesContext supports iterating over a messages on a stream.
MessagesContext interface {
// Next retreives next message on a stream. It will block until the next message is available.
// Next retreives next message on a stream. It will block until the next
// message is available.
Next() (Msg, error)
// Stop closes the iterator and cancels subscription.

// Stop unsubscribes from the stream and cancels subscription. Calling
// Next after calling Stop will return ErrMsgIteratorClosed error.
// All messages that are already in the buffer are discarded.
Stop()

// Drain unsubscribes from the stream and cancels subscription. All
// messages that are already in the buffer will be available on
// subsequent calls to Next. After the buffer is drained, Next will
// return ErrMsgIteratorClosed error.
Drain()
}

ConsumeContext interface {
// Stop unsubscribes from the stream and cancels subscription.
// No more messages will be received after calling this method.
// All messages that are already in the buffer are discarded.
Stop()

// Drain unsubscribes from the stream and cancels subscription.
// All messages that are already in the buffer will be processed in callback function.
Drain()
}

// MessageHandler is a handler function used as callback in [Consume]
Expand Down Expand Up @@ -97,7 +114,9 @@ type (
hbMonitor *hbMonitor
fetchInProgress uint32
closed uint32
draining uint32
done chan struct{}
drained chan struct{}
connStatusChanged chan nats.Status
fetchNext chan *pullRequest
consumeOpts *consumeOpts
Expand Down Expand Up @@ -240,6 +259,14 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
if err != nil {
return nil, err
}
sub.subscription.SetClosedHandler(func(sid string) func(string) {
return func(subject string) {
p.Lock()
defer p.Unlock()
delete(p.subscriptions, sid)
atomic.CompareAndSwapUint32(&sub.draining, 1, 0)
}
}(sub.id))

sub.Lock()
// initial pull
Expand Down Expand Up @@ -352,6 +379,8 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
sub.resetPendingMsgs()
}
sub.Unlock()
case <-sub.done:
return
}
}
}()
Expand Down Expand Up @@ -438,6 +467,7 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error
id: consumeID,
consumer: p,
done: make(chan struct{}, 1),
drained: make(chan struct{}, 1),
msgs: msgs,
errs: make(chan error, 1),
fetchNext: make(chan *pullRequest, 1),
Expand All @@ -450,28 +480,42 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error
p.Unlock()
return nil, err
}
sub.subscription.SetClosedHandler(func(sid string) func(string) {
return func(subject string) {
p.Lock()
defer p.Unlock()
if atomic.LoadUint32(&sub.draining) != 1 {
// if we're not draining, subscription can be closed as soon
// as closed handler is called
// otherwise, we need to wait until all messages are drained
// in Next
delete(p.subscriptions, sid)
}
close(msgs)
}
}(sub.id))

go func() {
<-sub.done
sub.cleanup()
}()
p.subscriptions[sub.id] = sub
p.Unlock()

go sub.pullMessages(subject)

go func() {
for {
status, ok := <-sub.connStatusChanged
if !ok {
select {
case status, ok := <-sub.connStatusChanged:
if !ok {
return
}
if status == nats.CONNECTED {
sub.errs <- errConnected
}
if status == nats.RECONNECTING {
sub.errs <- errDisconnected
}
case <-sub.done:
return
}
if status == nats.CONNECTED {
sub.errs <- errConnected
}
if status == nats.RECONNECTING {
sub.errs <- errDisconnected
}
}
}()

Expand All @@ -486,7 +530,9 @@ var (
func (s *pullSubscription) Next() (Msg, error) {
s.Lock()
defer s.Unlock()
if atomic.LoadUint32(&s.closed) == 1 {
drainMode := atomic.LoadUint32(&s.draining) == 1
closed := atomic.LoadUint32(&s.closed) == 1
if closed && !drainMode {
return nil, ErrMsgIteratorClosed
}
hbMonitor := s.scheduleHeartbeatCheck(2 * s.consumeOpts.Heartbeat)
Expand All @@ -506,8 +552,18 @@ func (s *pullSubscription) Next() (Msg, error) {
s.checkPending()
select {
case <-s.done:
drainMode := atomic.LoadUint32(&s.draining) == 1
if drainMode {
continue
}
return nil, ErrMsgIteratorClosed
case msg := <-s.msgs:
case msg, ok := <-s.msgs:
if !ok {
// if msgs channel is closed, it means that subscription was either drained or stopped
delete(s.consumer.subscriptions, s.id)
atomic.CompareAndSwapUint32(&s.draining, 1, 0)
return nil, ErrMsgIteratorClosed
}
if hbMonitor != nil {
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
}
Expand Down Expand Up @@ -650,6 +706,21 @@ func (s *pullSubscription) Stop() {
}
}

func (s *pullSubscription) Drain() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
atomic.StoreUint32(&s.draining, 1)
close(s.done)
if s.consumeOpts.stopAfterMsgsLeft != nil {
if s.delivered >= s.consumeOpts.StopAfter {
close(s.consumeOpts.stopAfterMsgsLeft)
} else {
s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered
}
}
}

// Fetch sends a single request to retrieve given number of messages.
// It will wait up to provided expiry time if not all messages are available.
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
Expand Down Expand Up @@ -834,18 +905,20 @@ func (s *pullSubscription) scheduleHeartbeatCheck(dur time.Duration) *hbMonitor
}

func (s *pullSubscription) cleanup() {
s.consumer.Lock()
defer s.consumer.Unlock()
if s.subscription == nil {
s.Lock()
defer s.Unlock()
if s.subscription == nil || !s.subscription.IsValid() {
return
}
if s.hbMonitor != nil {
s.hbMonitor.Stop()
}
s.subscription.Unsubscribe()
close(s.connStatusChanged)
s.subscription = nil
delete(s.consumer.subscriptions, s.id)
drainMode := atomic.LoadUint32(&s.draining) == 1
if drainMode {
s.subscription.Drain()
} else {
s.subscription.Unsubscribe()
}
atomic.StoreUint32(&s.closed, 1)
}

Expand Down
14 changes: 10 additions & 4 deletions jetstream/test/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,16 @@ func TestCreateStreamMirrorCrossDomains(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if lStream.CachedInfo().State.Msgs != 3 {
t.Fatalf("Expected 3 msgs in stream; got: %d", lStream.CachedInfo().State.Msgs)
}
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
info, err := lStream.Info(ctx)
if err != nil {
return fmt.Errorf("Unexpected error when getting stream info: %v", err)
}
if info.State.Msgs != 3 {
return fmt.Errorf("Expected 3 msgs in stream; got: %d", lStream.CachedInfo().State.Msgs)
}
return nil
})

rjs, err := jetstream.NewWithDomain(lnc, "HUB")
if err != nil {
Expand Down
Loading

0 comments on commit 44fac59

Please sign in to comment.