Skip to content

PIP 79: Reduce redundant producers from partitioned producer

nkurihar edited this page Feb 2, 2021 · 10 revisions

Motivation

When producer connects to partitioned topic, sometimes doesn't need to connect to all of partitions depending on rate, routing mode, etc. Some cases as below.

  1. different rate producers
    • we should set number of partitions according to the high-rate producer
    • sometimes low-rate producer doesn't need to connect to all of partitions
  2. SinglePartition routing mode
    • each producer uses only one partition

In this PIP, we reduce the number of producers to use system resources (e.g. connections between Client and Broker, memory usage of both Client and Broker) more efficiently. Here is an image of concept (second one is new concept). As you can see, each partitioned producer connects to part of partitions and the number of producers is less than or equal to 24.

limit_num_of_partitions_diff_v2

Also, this change allows us to scale producers by increasing partitions even where maxProducersPerTopic is set. For example, suppose system has producer limit. Currently, if topic reaches to its limit, then we have to create new topics or increase limit config. But suppose all producers connect to system with using above features, we can avoid producers from connecting to all partitions and thus scale producers by only increasing partitions (without increasing the max producer limit or creating new topics).

Public Interfaces

The following changes will be introduced to a public interface:

  • Add "limit number of partitions" config to org.apache.pulsar.client.api.ProducerBuilder interface

Proposed Changes

Create limit number of partitions feature

Add limit number of partitions feature to producer.

We would like to implement producer which can set limit number of partitions. When the setting is enabled, numPartitionsLimit partitions will be selected randomly from all partitions, and then each partitioned producer creates only numPartitionsLimit producer instances. If producer can't connect to the partition, try to connect to another partition. Sample of interface as below.

Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("persistent://public/default/pt")
                .numPartitionsLimit(10) // added
                .create();

Because of message ordering issue, this feature will be allowed in only RoundRobin routing mode. (Some custom routing mode can use the feature, but it's out of this PR's scope.)

Connect to one partition in SinglePartition routing mode

In SinglePartition routing mode, each partitioned producer uses only ONE partition to send messages. However, currently each partitioned producer creates producers to ALL partitions. We would like to reduce such "redundant" producers.

To enable this feature, set a config from producer side like below.

Producer<byte[]> producer = pulsarClient.newProducer()
                .topic("persistent://public/default/pt")
                .messageRoutingMode(MessageRoutingMode.SinglePartition)
                .numPartitionsLimit(1) // added
                .create();

After the numPartitionsLimit feature becomes stable, we are planning to change producer to connect to only one partition without enabling numPartitionsLimit(1) setting in SinglePartition mode.

Change PartitionedTopicStats about producer

Currently, if each partition has different producers like above, then PartitionedTopicStats will be incorrect. We would like to change logic to collect PartitionedTopicStats

This change allows us to get PartitionedTopicStats correctly not only when producers connect to all partitions, but also only part of partitions.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
    • can get PartitionedTopicStats correctly not only when producers connect to all partitions, but also only part of partitions
  • If we are changing behavior how will we phase out the older behavior?
    • After the numPartitionsLimit feature becomes stable, we are planning to change producer to connect to only one partition without enabling numPartitionsLimit(1) setting in SinglePartition mode
    • we would like to change logic to collect PartitionedTopicStats, but this change is one of fixing issue
  • If we need special migration tools, describe them here.
    • don't need
  • When will we remove the existing behavior?
    • don't remove any behavior

Test Plan

We would like to validate these features by E2E test code.

Clone this wiki locally