Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide a way to handle 409 Leadership Changed errors in client #1151

Merged
merged 2 commits into from
Dec 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -2580,6 +2580,11 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
err = ErrConsumerDeleted
break
}

if strings.Contains(strings.ToLower(string(msg.Header.Get(descrHdr))), "leadership change") {
err = ErrConsumerLeadershipChanged
break
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we need to revise this codepath eventually to bubble up these errors in a way that is more predictable... These errors are JetStream errors without an error code so maybe not well represented by the APIError type, what I'm thinking is that we could wrap this as a new statusError type so that below when return the error with an unexpected case.

There are other 409 status codes that we are not catching that may end up in a similar way, probably NATS/1.0 409 Message Size Exceeds MaxBytes will end up as "nats: Message Size Exceeds MaxBytes" for example.
Have a wip on trying to addressing some of this in a different PR: aa8123d

Copy link
Contributor Author

@sata-form3 sata-form3 Dec 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, we need to revise this codepath eventually to bubble up these errors in a way that is more predictable... These errors are JetStream errors without an error code so maybe not well represented by the APIError type, what I'm thinking is that we could wrap this as a new statusError type so that below when return the error with an unexpected case.

There are other 409 status codes that we are not catching that may end up in a similar way, probably NATS/1.0 409 Message Size Exceeds MaxBytes will end up as "nats: Message Size Exceeds MaxBytes" for example. Have a wip on trying to addressing some of this in a different PR: aa8123d

Yeah, this PR is just a stop gap measure.

hdr := []byte("NATS/1.0 409 Leadership Change\r\n\r\n")
	for reply := range o.prm {
		o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
	}
	o.prm = nil

Link

At the moment, the message header and body are just defined as byte slices. I would say this isn't very sufficient structure enough when it comes to managing changes to the protocol or supporting a better error handling without inventing a new binary data protocol. Which is of course possible, but it comes as other costs, such as implementing it in all languages there is a client library for the nats libraries to use.

It would be perhaps good to start using a serialising format which would support backwards and forward compatibility, nested data structures and has library support for a number of languages already, such as Protobufs, Avro, Thrift, or whatever that fits Nats eco system best.
Such an approach would even give the possibility of having the error codes explicitly published thus making it very easy for clients to know what errors exist to map them to canonical error types in the language the client library is written in.

fallthrough
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
Expand Down
3 changes: 3 additions & 0 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ var (
// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

// ErrConsumerLeadershipChanged is returned when pending requests are no longer valid after leadership has changed
ErrConsumerLeadershipChanged JetStreamError = &jsError{message: "Leadership Changed"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
Expand Down