Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-344 Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName) #22182

Merged
merged 25 commits into from
Mar 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions pip/pip-344.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# PIP-344: Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName)

# Background knowledge

**Topic auto-creation**
- The partitioned topic auto-creation is dependent on `pulsarClient.getPartitionsForTopic`
- It triggers partitioned metadata creation by `pulsarClient.getPartitionsForTopic`
- And triggers the topic partition creation by producers' registration and consumers' registration.
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
- When calling `pulsarClient.getPartitionsForTopic(topicName)`, Pulsar will automatically create the partitioned topic metadata if it does not exist, either using `HttpLookupService` or `BinaryProtoLookupService`.

**Now `pulsarClient.getPartitionsForTopic`'s behavior**
| case | broker allow `auto-create` | param allow <br> `create if not exists` | non-partitioned topic | partitioned topic | current behavior |
| --- | --- | --- | --- | --- | --- |
| 1 | `true/false` | `true/false` | `exists: true` | | REST API: `partitions: 0`<br> Binary API: `partitions: 0` |
| 2 | `true/false` | `true/false` | | `exists: true` <br> `partitions: 3` | REST API: `partitions: 3`<br> Binary API: `partitions: 3` |
| 3 | `true` | `true` | | | REST API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `partitions: 3` <br> Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `partitions: 3` <br> |
| 4 | `true` | `false` | | | REST API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> Binary API: <br> &nbsp;&nbsp;not support <br> |
| 5 | `false` | `true` | | | REST API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> Binary API: <br> &nbsp;&nbsp;- `create new: false` <br> &nbsp;&nbsp;- `partitions: 0` <br> |

- Broker allows `auto-create`: see also the config `allowAutoTopicCreation` in `broker.conf`.
- Param allow <br> `create if not exists`
- Regarding the HTTP API `PersistentTopics.getPartitionedMetadata`, it is an optional param which named `checkAllowAutoCreation,` and the default value is `false`.
- Regarding the `pulsar-admin` API, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `false` and can not be set manually.
- Regarding the client API `HttpLookupService.getPartitionedTopicMetadata`, it depends on the HTTP API `PersistentTopics.getPartitionedMetadata`, and it always sets the param `checkAllowAutoCreation` to `true` and can not be set manually.
- Regarding the client API `BinaryProtoLookupService.getPartitionedTopicMetadata`, it always tries to create partitioned metadata.
- `REST API & HTTP API`: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP API `PersistentTopics.getPartitionedMetadata`, `pulsar-admin`, and `HttpLookupService.getPartitionedTopicMetadata` HTTP API, and call `BinaryProtoLookupService.getPartitionedTopicMetadata` Binary API.

# Motivation

The param `create if not exists` of the Binary API is always `true.`

- For case 4 of `pulsarClient.getPartitionsForTopic`'s behavior, it always tries to create the partitioned metadata, but the API name is `getxxx`.
- For case 5 of `pulsarClient.getPartitionsForTopic`'s behavior, it returns a `0` partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed [here](https://github.com/apache/pulsar/issues/8813) before.
codelipenghui marked this conversation as resolved.
Show resolved Hide resolved
- BTW, [flink-connector-pulsar](https://github.com/apache/flink-connector-pulsar/blob/main/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java#L221-L227) is using this API to create partitioned topic metadata.

# Goals

- Regarding the case 4: Add a new API `PulsarClient.getPartitionsForTopic(String, boolean)` to support the feature that just get partitioned topic metadata and do not try to create one. See detail below.
- Regarding the case 5: Instead of returning a `0` partitioned metadata, respond to a not found error when calling `pulsarClient.getPartitionsForTopic(String)` if the topic does not exist.

# Detailed Design

## Public-facing Changes

When you call the public API `pulsarClient.getPartitionsForTopic`, pulsar will not create the partitioned metadata anymore.

### Public API
**LookupService.java**
```java

- CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName);

/**
* 1. Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; return "{partition: 0}" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
+ CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled);
```

The behavior of the new API `LookupService.getPartitionedTopicMetadata(TopicName, boolean)`.

| case | client-side param: `createIfAutoCreationEnabled` | non-partitioned topic | partitioned topic | broker-side: topic auto-creation strategy | current behavior |
|------|--------------------------------------------------|-----------------------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------|
| 1 | `true/false` | `exists: true` | | | REST/Binary API: `{partitions: 0}` |
| 2 | `true/false` | | `exists: true` <br> `partitions: 2` | | REST/Binary API: `{partitions: 2}` |
| 3 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `non-partitioned` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `{partitions: 0}` |
| 4 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `partitioned` <br> `defaultNumPartitions`: `2` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `{partitions: 2}` |
| 5 | `false` | | | `allowAutoTopicCreation`: `true` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |
| 6 | `true` | | | `allowAutoTopicCreation`: `false` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |


**PulsarClient.java**
```java
// This API existed before. Not change it, thus ensuring compatibility.
+ @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(TopicName, boolean)}.
- CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+ default CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
+ getPartitionsForTopic(topic, true);
+ }

/**
* 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
* 2. When {@param createIfAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
* 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
* 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
* @version 3.3.0
*/
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled);
```

The behavior of the new API `PulsarClient.getPartitionsForTopic(String, boolean)`.

| case | client-side param: `createIfAutoCreationEnabled` | non-partitioned topic | partitioned topic | broker-side: topic autp-creation strategy | current behavior |
|------|--------------------------------------------------|----------------------|-------------------------------------|--------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|
| 1 | `true/false` | `exists: true` | | | REST/Binary API: `["{tenat}/{ns}/topic"]` |
| 2 | `true/false` | | `exists: true` <br> `partitions: 2` | | REST/Binary `API`: `["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]` |
| 3 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `non-partitioned` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `["{tenat}/{ns}/topic"]` |
| 4 | `true` | | | `allowAutoTopicCreation`: `true` <be> `allowAutoTopicCreationType`: `partitioned` <br> `defaultNumPartitions`: `2` | REST/Binary API: <br> &nbsp;&nbsp;- `create new: true` <br> &nbsp;&nbsp;- `["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]` |
| 5 | `false` | | | `allowAutoTopicCreation`: `true` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |
| 5 | `true` | | | `allowAutoTopicCreation`: `false` | REST/Binary API: <br> &nbsp;&nbsp;- Not found error |



### Binary protocol

**CommandPartitionedTopicMetadata**
```
message CommandPartitionedTopicMetadata {
+ optional bool metadata_auto_creation_enabled = 6 [default = true];
}
```

**FeatureFlags**
```
message FeatureFlags {
+ optional bool supports_binary_api_get_partitioned_meta_with_param_created_false = 5 [default = false];
}
```

# Backward & Forward Compatibility

- Old version client and New version Broker: The client will call the old API.

- New version client and Old version Broker: The feature flag `supports_binary_api_get_partitioned_meta_with_param_created_false` will be `false`. The client will get a not-support error if the param `createIfAutoCreationEnabled` is false.
Loading