Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Failed to create OrderedConsumer #1679

Open
niondir opened this issue Jul 18, 2024 · 6 comments
Open

Failed to create OrderedConsumer #1679

niondir opened this issue Jul 18, 2024 · 6 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@niondir
Copy link

niondir commented Jul 18, 2024

Observed behavior

When Creating an OrderedConsumer an error is returend: nats: API error: code=400 err_code=10084 description=consumer in pull mode requires ack policy

Expected behavior

It should be possible to create an OrderedConsumer

Server and client version

Client Tested with: v1.34.0 and v1.36.0
Server: v2.10.12 and v2.10.18

Host environment

Windows amd64

Steps to reproduce

Stream:

streamCfg := jetstream.StreamConfig{
			Description: "uplinks from devices waiting to be parsed",
			Name:        "parse",
			Subjects:    []string{"test-subject.>"},
			Storage:     jetstream.FileStorage,
			Retention:   jetstream.WorkQueuePolicy,
		}

Consumer:

parseStream, err := js.Stream(context.Background(), "parse")


orderedCons, err := parseStream.OrderedConsumer(ctx, jetstream.OrderedConsumerConfig{
		FilterSubjects:    []string{"test-subject.>"},
		DeliverPolicy:     jetstream.DeliverAllPolicy,
		OptStartSeq:       0,
		OptStartTime:      nil,
		ReplayPolicy:      jetstream.ReplayInstantPolicy,
		InactiveThreshold: 0,
		HeadersOnly:       false,
		MaxResetAttempts:  0,
	})

// err = "consumer in pull mode requires ack policy"

Or is it not possible to use the "WorkQueuePolicy" but then the error message is at least missleading.

@niondir niondir added the defect Suspected defect such as a bug or regression label Jul 18, 2024
@niondir
Copy link
Author

niondir commented Jul 18, 2024

Due to the public Doc an "Ordered Consumer" is Push based.

Ordered consumers are the convenient default type of push consumers designed for applications that want to efficiently consume a stream for data inspection or analysis.

See: https://docs.nats.io/nats-concepts/jetstream/consumers

Due to the Source Documentation the OrderedConsumer I'm trying to use is Pull bases:

// OrderedConsumer returns an OrderedConsumer instance. OrderedConsumer
// are managed by the library and provide a simple way to consume
// messages from a stream. Ordered consumers are ephemeral in-memory
// pull consumers and are resilient to deletes and restarts.
OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error)

See:

OrderedConsumer(ctx context.Context, cfg OrderedConsumerConfig) (Consumer, error)

This is a little bit confusing and might be part of my issue to find the right solution for my problem.

@piotrpio
Copy link
Collaborator

Hello @niondir, thanks for reporting the issue. As to the first point, your assumption was correct, it is not possible to create a pull-based ordered consumer on a workqueue stream as ordered consumers have AckNone policy set by default.

I created a PR to nats-server to improve the error message: nats-io/nats-server#5678

To your other point, I agree that the documentation is not up to date - it was true until the new pull-based JetStream API was introduced in the clients. In this new API we lean heavily on pull consumers, so the ordered consumer functionality is also implemented on top of a pull consumer. I'll bring it up and we'll improve the docs.

@somratdutta
Copy link
Contributor

hi @piotrpio ,
I wanted to contribute to improved Nats Documentation for the new pull-based JetStream API,
let me know if I can be of any help?

@niondir
Copy link
Author

niondir commented Aug 23, 2024

Okay, so it's correct that you support pull and push based ordered consumers now but both are AckNone. Makes sense.

I'm thinking a lot about implementing a pull based consumer with at least once (or even exactly once) semantics. But it's challenging.

At least once could work when the stream would re-publish already pulled but not yet acked messages on fetch and the consumer implements some deduplication and sorting based on message IDs for the batch.
Since a consumer already pulls when 50% of the fetched messages are handled, it would also be beneficial to tell the stream to skip N messages during pull.

But I guess that needs some of the mentioned features on stream side.

@jnmoyne
Copy link
Contributor

jnmoyne commented Aug 24, 2024

Ordered consumers have their own 'exactly once delivery' protocol that is implemented in the client library such that it delivers all of the messages without gaps or duplicates and in order (hence the name) to your message handler callback exactly once: meaning if your callback code can process the message, there is no way to signal that (e.g. using a negative or term or in-process acknowledgement) such that you can get the message re-delivered later. I.e. it does exactly once delivery of the messages, not exactly once processing.

That means that when using an ordered consumer you can't ack messages in any way but on the other hand that makes the ordered consumer considerably faster (higher throughput) than an explicitly asked consumer.

Work queue and interest streams are meant for 'consumption' (rather than merely reading) of the messages therefore you must have the consuming code explicitly ack each message therefore you can not (and would not want to) use an ordered consumer on a working queue or interest stream.

@somratdutta if you want to improve the documentation text inside nats.go (or the documentation web site text) you can always create a fork of the repo, commit your edits and create a PR.

@niondir
Copy link
Author

niondir commented Aug 31, 2024

So you basically I can not have guaranteed consumption (work queue) and guaranteed order (ordered consumer) at the same time?

But when I think about some critical systems you might need that - what's the solution then? Or do I miss something?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

4 participants