Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: avoid starvation in subscriptionManager
The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time) Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in `subscriptionManager` is faulty, perhaps assuming that `case:` order prioritizes which `case` should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements): ``` If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed. ``` For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the `bc.input` channel. After an iteration there is a race between `case event, ok := <-bc.input` which will batch the request and `case bc.newSubscriptions <- buffer` which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch. This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours. Contributes-to: IBM#1608 IBM#1897 Co-authored-by: Dominic Evans <[email protected]>
- Loading branch information