Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEW NATS API Drain for durable consumer #1673

Closed
jamm3e3333 opened this issue Jul 15, 2024 · 16 comments
Closed

NEW NATS API Drain for durable consumer #1673

jamm3e3333 opened this issue Jul 15, 2024 · 16 comments
Assignees
Labels
defect Suspected defect such as a bug or regression

Comments

@jamm3e3333
Copy link

jamm3e3333 commented Jul 15, 2024

Observed behavior

I'm using a durable consumer according to the docs and I wanna gracefully shutdown the subscription to durable consumer. That means putting subscription into the drain state, Nak all the messages that are in the pull batch and stop accepting new messages that are coming to the stream and stop sending it to the consumer. The behaviour that I'm observing is strange because every time I try to "restart" the application (sending SIGINT signal) it really does Nak all the messages for the subscription to the durable consumer, but when I start the application again not all the Naked messages are consumed again, some are duplicated and overall the behaviour is pretty strange.

I use the the consumer from the new nats API from the jetstream package and the only time I can drain the subscription is when having the jetstream.ConsumeContext or jetstream.MessageContext that means only when calling the Consume method on the consumer or Messages method on the consumer. But maybe I'm doing something wrong, not sure exactly.

Strange thing is that for the all NATS API for the it worked perfectly fine.

This is my code and what I've tried

// pkg/nats.connection.go
type Connection struct {
	Nc                *nats.Conn
	Js                jetstream.JetStream
        lg                 logger.Logger
}

func Init(ctx context.Context, lg logger.Logger, st string) *Connection {
	nc, err := nats.Connect(p.Urls,
		nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
			lg.Error("Got temporarily disconnected! Reason: %q\n", err)
		}),
		nats.ReconnectHandler(func(nc *nats.Conn) {
			lg.Error("Got reconnected to %v!\n", nc.ConnectedUrl())
		}),
		nats.ClosedHandler(func(nc *nats.Conn) {
			lg.Error("Connection closed. Reason: %q\n", nc.LastError())
		}),
		nats.ConnectHandler(func(conn *nats.Conn) {
			lg.Info("NATS has been connected!")
		}),
		nats.Timeout(10*time.Second),
	)

	if err != nil {
		lg.Fatal("Error connecting to NATS:", err)
	}
	js, err := jetstream.New(nc)
	if err != nil {
		lg.Fatal("Error returning JetStream:", err)
	}

	return &Connection{
		Nc:                nc,
		Js:                js,
		lg:                lg,
                stream:        st,
	}
}

func (c *Connection) Subscribe(
	ctx context.Context,
	stream string,
	opts *jetstream.ConsumerConfig,
) error {
	con, err := c.Js.CreateOrUpdateConsumer(ctx, stream, *opts)
	if err != nil {
		return err
	}
	cctx, err := con.Consume(func(m jetstream.Msg) {
		go func(msg jetstream.Msg, opts *jetstream.ConsumerConfig) {
			fmt.Println("MSG_HANDLE_START", msg.Subject())
			select {
			case <-ctx.Done():
				if err := msg.Nak(); err != nil {
					fmt.Println("MSG_NAK_ERR", err.Error())
				}
				fmt.Println("MSG_NAK_OK", msg.Subject())
			case <-time.After(opts.AckWait):
				if err := msg.Ack(); err != nil {
					fmt.Println("MSG_ACK_ERR", err.Error())
				}
				fmt.Println("MSG_ACK_OK", msg.Subject())
			}

		}(m, opts)
	})

	go func(cctx jetstream.ConsumeContext) {
		<-ctx.Done()
		fmt.Println("DRAIN")
		cctx.Drain()
	}(cctx)

	return nil
}
// main.go
func main() {
	ctx, cancel := context.WithCancel(context.Background())

       // this is just to test the shut down of the app after 10s
	go func() {
		<-time.After(10 * time.Second)
		cancel()
	}()

        nc := nats.Init(
		ctx,
		logger.NewLogger(),
                "my_stream"
         ))

        err := nc.Subscribe(ctx, "my_stream", &jetstream.ConsumerConfig{
			Durable:       "my_unpredictable_durable_consumer",
			AckPolicy:     jetstream.AckExplicitPolicy,
			AckWait:       2 * time.Minute,
			FilterSubject: "sub.*",
		},)
        if err != nil {
            panic(err)
        }
        <-ctx.Done()
        fmt.Println("shutdown signaled")
}

and how did I test it is that on app start I published 3 unique message:

09:59:40 Published 1014 bytes to "sub.26bdd520-ece7-44e1-82ac-42632318f105"
09:59:41 Published 1014 bytes to "sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167"
09:59:42 Published 1014 bytes to "sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828"

then after 10s app was shut down and ctx done was being propagated, then I restarted the app observed the logs and do this couple of times and these are the logs from the app:

09:59:32 INF NATS has been connected!
MSG_HANDLE_START sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_HANDLE_START sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
DRAIN
09:59:42 INF shutdown signaled
MSG_NAK_OK sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_HANDLE_START sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
09:59:42 INF shutdown completed!
Process Exit with Code 0
building...
running...
09:59:52 INF NATS has been connected!
MSG_HANDLE_START sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_HANDLE_START sub.26bdd520-ece7-44e1-82ac-42632318f105
DRAIN
10:00:01 INF shutdown signaled
MSG_NAK_OK sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
10:00:01 INF shutdown completed!
Process Exit with Code 0
building...
running...
10:00:07 INF NATS has been connected!
MSG_HANDLE_START sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
MSG_HANDLE_START sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
DRAIN
10:00:16 INF shutdown signaled
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_NAK_OK sub.f6f4f898-f158-4ce7-8d4f-c8b0fcd41167
10:00:16 INF shutdown completed!
building...
running...
10:00:22 INF NATS has been connected!
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_HANDLE_START sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
DRAIN
10:00:31 INF shutdown signaled
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.26bdd520-ece7-44e1-82ac-42632318f105
10:00:31 INF shutdown completed!
building...
running...
10:00:37 INF NATS has been connected!
MSG_HANDLE_START sub.26bdd520-ece7-44e1-82ac-42632318f105
MSG_HANDLE_START sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
DRAIN
10:00:47 INF shutdown signaled
MSG_NAK_OK sub.29b5c2d8-2f01-4acc-b28e-fd5e33815828
MSG_NAK_OK sub.26bdd520-ece7-44e1-82ac-42632318f105
10:00:47 INF shutdown completed!

I thought that maybe sometimes messages are Naked before the subscription is put into the "drain" state, I thought that this might be caused because I'm handling ctx.Done() on 2 places so I did a little changes with just one place of handling ctx.Done():

        var cctx jetstream.ConsumeContext
	cctx, err = con.Consume(func(m jetstream.Msg) {
		go func(msg jetstream.Msg, consumeContext jetstream.ConsumeContext, opts *jetstream.ConsumerConfig) {
			fmt.Println("MSG_HANDLE_START", msg.Subject())
			select {
			case <-ctx.Done():
				sync.OnceFunc(func() {
					cctx.Drain()
				})
				if err := msg.Nak(); err != nil {
					fmt.Println("MSG_NAK_ERR", err.Error())
				}
				fmt.Println("MSG_NAK_OK", msg.Subject())
			case <-time.After(opts.AckWait):
				if err := msg.Ack(); err != nil {
					fmt.Println("MSG_ACK_ERR", err.Error())
				}
				fmt.Println("MSG_ACK_OK", msg.Subject())
			}
		}(m, cctx, opts)
	})

but without luck and the behaviour was the same as before.

It seems to me that the subscription is not really in the drain state because after Nak the message is being handled again, but after 1st restart it seems that all 3 messages was consumed again, but after few more restarts messages are being duplicated or lost.

Not sure if I'm doing smth wrong, but I'm unable to find anything related to Drain and new nats api.

I even tried with Messages method:

	go func() {
		msgs, err := con.Messages()
		if err != nil {
			return
		}
	
		fmt.Println("PULL_SUBSCRIBE", opts.Durable)
	
		for {
			m, err := msgs.Next()
			if err != nil {
				if errors.Is(err, jetstream.ErrMsgIteratorClosed) {
					return
				}
				continue
			}
	
			go func(msg jetstream.Msg) {
				fmt.Println("MSG_HANDLE", msg.Subject())
				select {
				case <-ctx.Done():
					fmt.Println("DRAIN")
					msgs.Drain()
					if err := msg.Nak(); err != nil {
						fmt.Println("MSG_NAK_ERR", err.Error())
					}
					fmt.Println("MSG_NAK_OK", msg.Subject())
				case <-time.After(2 * time.Minute):
					if err := msg.Ack(); err != nil {
						fmt.Println("MSG_ACK_ERR", err.Error())
					}
					fmt.Println("MSG_ACK_OK", msg.Subject())
				}
			}(m)
		}
	}()

but I observed the same behaviour. Basically I have just 2 ways where I can call the Drain method, when I have ConsumeContext and MessageContext. Can you please check that 🙏

I did the same implementation with the old NATS API and it worked as expected.

Expected behavior

On app shutdown, when the subscription for durable consumer is in the Drain state and I Nak the messages I expect the messages to be Naked once and no new messages are being handled for the subscription and after app restart all messages are consumed again and no messages are lost for the subscription. That's what I really wish for.

Server and client version

server: 2.9.15
client: v1.35.0

IMO the server version shouldn't matter that much because I observe different behaviour for new nats api and old nats api with the same server version.

Host environment

No response

Steps to reproduce

No response

@jamm3e3333 jamm3e3333 added the defect Suspected defect such as a bug or regression label Jul 15, 2024
@JakubSchneller
Copy link

I'm dealing with the same exact issue.

@krizacekcz
Copy link

Hello, we are dealing with the same problem on production. It's causing problems

@Jarema
Copy link
Member

Jarema commented Jul 15, 2024

Thanks for detailed report!

Please note, that you could not have the same behavior in the old API, as it did not support Consume for pull consumers. It only had Fetch.

The behavior you describe was indeed a bug in the server that was fixed in one of the latest version of the server. @piotrpio can provide details and PR links.

Please try 2.10.17 and 1.36.0 client.

@JakubSchneller @krizacekcz what version of the server are you running? Can you provide code samples?

@piotrpio
Copy link
Collaborator

piotrpio commented Jul 15, 2024

Hello @jamm3e3333, thank you for creating the issue. In addition to what @Jarema mentioned, Drain() is non-blocking intentionally, since processing messages may take a lot of time and you don't necessarily always want to wait for it to finish. It works the same way in core NATS.

However, what we can and should add and it should solve your issue, is a method on ConsumeContext which notifies whether the consumer is done processing or not. This notification would be sent over a channel, so you'll be easily able to block after calling Drain() to wait for Consume() callbacks to finish processing.

It will loos something like this (method name may be different, not sure yet):

go func(cctx jetstream.ConsumeContext) {
  <-ctx.Done()
  fmt.Println("DRAIN")
  cctx.Drain()
  <-cctx.Closed()     
}(cctx)

This will also allow you to add a timeout with simple select.

Would that solution satisfy your use case?

@krizacekcz
Copy link

@piotrpio you answered in the wrong thread my boy

@krizacekcz
Copy link

@piotrpio this functionality that would notify over the channel about drain state will be part of next minor/patch ?

@piotrpio
Copy link
Collaborator

Oh, sorry about that, I started answering on the other issue but wanted to mention @Jarema's comment 🤦

Yes, that would be a part of the next minor release.

@krizacekcz
Copy link

In our server on production we use 2.10.12.. is it possible that it was fixed in patch @piotrpio ? However we tried locally with updated versions and the outcome was the same. We tried to publish 3 messages, we manually force graceful shutdown (with Drain attached to the ctx.Done) and after we restart the app, we either have duplicates messages or messages are lost completely. Could u give us some support please ?

@thomas-maurice
Copy link

In addition to what @Jarema mentioned, Drain() is non-blocking intentionally, since processing messages may take a lot of time and you don't necessarily always want to wait for it to finish. It works the same way in core NATS.

@piotrpio it is a bit misleading because the documentation for that function states that

Drain unsubscribes from the stream and cancels subscription. All messages that are already in
the buffer will be processed in callback function.

Which can be understood as the Drain function being blocking

@jamm3e3333
Copy link
Author

@Jarema I tried with 2.10.17 server and 1.36.0 client, but I observer kinda the same behaviour and eventually messages lost after couple of app restarts.

As you said:

Please note, that you could not have the same behavior in the old API, as it did not support Consume for pull consumers. It only had Fetch.

That's true but the draining the subscription worked properly and now it behaves kinda random. And regarding the comparison of new and old NATS API why when using Fetch method in the new NATS API the Drain methods is not available? Is it on purposes and if so what's the purpose of not having the Drain method when using Fetch?

Anyways that's what I observed from logs:

14:01:53 INF NATS has been connected!
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_HANDLE_START sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
DRAIN
MSG_NAK_OK sub.e3798536-af06-4952-a81f-6e26f75be290
14:02:02 INF shutdown signaled
MSG_NAK_OK sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_NAK_OK sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_HANDLE_START sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_NAK_OK sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_NAK_OK sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_NAK_OK sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
14:02:02 INF shutdown completed!
Process Exit with Code 0
building...
running...
14:02:07 INF NATS has been connected!
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_HANDLE_START sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
14:02:07 INF start listening GRPC on port 50051
DRAIN
14:02:17 INF shutdown signaled
MSG_NAK_OK sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_NAK_OK sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_NAK_OK sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
14:02:17 INF shutdown completed!
building...
running...
14:02:21 INF NATS has been connected!
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_HANDLE_START sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
DRAIN
MSG_NAK_OK sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_NAK_OK sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_NAK_OK sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
14:02:31 INF shutdown signaled
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_NAK_OK sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
MSG_NAK_OK sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_HANDLE_START sub.82e0ce01-08d1-4a5e-9413-df331b0c5726
MSG_HANDLE_START sub.e3798536-af06-4952-a81f-6e26f75be290
MSG_HANDLE_START sub.2d229b4e-d3bf-4097-a11b-2bdd08af2589
14:02:31 INF shutdown completed!
building...
running...
14:02:35 INF NATS has been connected!
DRAIN
14:02:45 INF shutdown signaled
14:02:45 INF shutdown completed!

After the context is canceled (app shut down) messages are Naked and handled again I guess because of Drain being non blocking, not sure, I didn't study the NATS client go code into deep, so I can't tell. But the question is should it behave like this and we need to do some additional handling or this is not expected behaviour and we should wait for the fix from @piotrpio #1673 (comment) or stick to the old nats api

or maybe we can contribute and try to fix it ourselves 🤷

@piotrpio
Copy link
Collaborator

@jamm3e3333 v2.10.17 fixed some issues with Drain() but the core functionality in the client remains the same, so it would not fix your issue, if you need to wait until drain completes, you should wait for the next release with the new feature.

That's true but the draining the subscription worked properly and now it behaves kinda random.

It is not random, it's non blocking. It is non-blocking in the old API as well, you simply do not have callback-based pull consumer so you end up calling Fetch() until it is drained.

In the new API you do not have Drain() on FetchI() because you do not have Unsubscribe() as well - Fetch() in the new API is a simple request to JetStream saying "give me n messages in t amount of time" - and either you get all the messages or whatever's available in the time specified with FetchMaxWait() option.

@thomas-maurice I will try to rephrase documentation to make it more clear (although I do not think the doc implies that Drain() waits until callback are executed). I agree that it can be improved.

To summarize:

  1. We will improve the docs to make it clear that Drain does not wait for it to complete.
  2. We will expose method that allows checking for consumer to be "Closed".

@krizacekcz
Copy link

Would you have some time for the quick call? Because it's behavior is really weird. I think you either come and see the problem, or you will have some feedback for next release to fix something.

@jamm3e3333
Copy link
Author

jamm3e3333 commented Jul 15, 2024

@jamm3e3333 v2.10.17 fixed some issues with Drain() but the core functionality in the client remains the same, so it would not fix your issue, if you need to wait until drain completes, you should wait for the next release with the new feature.

That's true but the draining the subscription worked properly and now it behaves kinda random.

It is not random, it's non blocking. It is non-blocking in the old API as well, you simply do not have callback-based pull consumer so you end up calling Fetch() until it is drained.

In the new API you do not have Drain() on FetchI() because you do not have Unsubscribe() as well - Fetch() in the new API is a simple request to JetStream saying "give me n messages in t amount of time" - and either you get all the messages or whatever's available in the time specified with FetchMaxWait() option.

@thomas-maurice I will try to rephrase documentation to make it more clear (although I do not think the doc implies that Drain() waits until callback are executed). I agree that it can be improved.

To summarize:

  1. We will improve the docs to make it clear that Drain does not wait for it to complete.
  2. We will expose method that allows checking for consumer to be "Closed".

I see, but that way for new NATS API you are not able to handle the graceful shutdown with Fetch, because when you call fetch you get some batch of messages and then the app is shut down and you're not able to Nak the messages for the durable consumer. Or am I missing something?

@typecampo
Copy link

Im interested to see if this is correct.

So not sure but it looks like Consume just does normal Subscribe on the nats connections.

sub.subscription, err = p.jetStream.conn.Subscribe(inbox, internalHandler)

So doesnt that mean all the messages that were published in response to the internal pull request is just handled by a normal Subscriber. Which then means the nc.Drain would infact wait for the messages received to be handled before moving to closed state?

Of course you would need to use wg.Done inside the nats.ClosedHandler

@piotrpio
Copy link
Collaborator

piotrpio commented Aug 2, 2024

Yes, that is something you could do (as you said, in conjunction with ClosedHandler(). Bear in mind that this will drain all your active subscriptions across the connection.

We're thinking of adding a Drain() on a single Fetch(). This is not ideal since you may get redeliveries anyway if you initiate draining when the client sends messages but the client has not yet received them, but that is not something the client can control. Still, I agree that this may be a valid use case.

In the meantime, I created a PR adding a Closed() method on Consume(): #1691

@piotrpio
Copy link
Collaborator

Closing this issue as ConsumeContext.Closed() was released in v1.37.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

7 participants