Replies: 6 comments 17 replies
-
I would suggest using a chan subscriber, then you can select on them all. https://github.com/nats-io/nats.go/blob/main/test/sub_test.go#L799 |
Beta Was this translation helpful? Give feedback.
-
To clarify what I want to achieve I prepared the simplest worker pool implementation I can come up with (see the bottom of this message; please note, that although it can be used for simple tests, I omitted error handling and graceful shutdown; the real-live implementation would dispatch messages to handlers based on messages’ subjects). Like I said, this solution works well if it is ok to create a separate pool for each subscription. And while in many cases this is in fact ok, sometimes that can lead to non optimal resource utilization. I dug a bit in nats internals and I think the fetching from multiple subscription could be implemented as follows (the example assumes two subscription, two available workers (hence
I haven’t yet implemented this in Go, but from what I have observed using Can anyone confirm if this is the right approach? Worker pool implementation
|
Beta Was this translation helpful? Give feedback.
-
Is the work cpu or io bound? How long does it take to process? Are you trying to optimize using all cores and minimizing latency? |
Beta Was this translation helpful? Give feedback.
-
In this message I elaborate on the original problem of how to efficiently fetch messages from more than one subscription. I present possible solutions and discuss their drawbacks. I build on the example of a worker pool from one of my previous messages. @derekcollison The problem with fetching from multiple subscriptions is that we don't know upfront which of them has messages to be consumed. We need to fetch from subscriptions to find out. We can do this concurrently (as in the first example implementation below) or sequentially (second example). Both approaches have their drawbacks which I point out below. As a reminder, the worker pool has a run method responsible for spawning workers and fetching tasks for them:
Lets modify it so that it operates not on a single subscription, but on a group of subscriptions (this is the only modification needed when it comes to worker pool implementation):
Note that, when the pool calls the Fetch method it passes it the number of idle workers as a
In the above example we fetch messages concurrently, which may lead to the situation where we have fetched more messages than we can process, so we need to push them back (by Bellow is the alternative implementation where subscriptions are fetched sequentially:
The problem here is that in the worst scenario we need to wait |
Beta Was this translation helpful? Give feedback.
-
IntroWith this message I start a discussion on how fetching from multiple subscriptions/consumers can be implemented in the library. First I propose two possible APIs extensions that will enable users to request new behavior. Then I describe how pulling from a consumer actually works and what changes are required to implement the discussed functionality. I focus on the protocol and messages exchanged between the library and the server, and on showing how we can use what the server and protocol already provide to achieve the goal. On required API changeWe clearly need a notion of a subscription group in the API. Otherwise how do we enable the user to express the will of fetching from multiple subscriptions? I see two possible approaches: Option 1 - new
|
Beta Was this translation helpful? Give feedback.
-
Why not flow controlled push with the callback putting all messages in a single channel? |
Beta Was this translation helpful? Give feedback.
-
I have pool of goroutines to handle messages from NATS. I only want to fetch a message from NATS is there is a free goroutine in the pool.
In the code this may look like (pseudocode):
This works if there is only one subscription. If there is more than one I need to select subscription that has undelivered message:
So the question is how to implement
selectNatsSubWithAwaitingMessage()
function.Beta Was this translation helpful? Give feedback.
All reactions