Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Can't connecte to non-persist topic when enable broker client tls #22991

Merged
merged 4 commits into from
Jul 3, 2024

Conversation

shibd
Copy link
Member

@shibd shibd commented Jul 2, 2024

Motivation

After #22838, if to connect a non-persist topic when broker enable client TLS, will failed.

2024-07-02T17:04:47,703 - WARN  - [pulsar-io-19-4:ClientCnx] - Error during handshake
java.nio.channels.ClosedChannelException: null
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1154) ~[netty-handler-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1402) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:900) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:811) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
	Suppressed: io.netty.handler.ssl.StacklessSSLHandshakeException: Connection closed while SSL/TLS handshake was in progress
		at io.netty.handler.ssl.SslHandler.channelInactive(Unknown Source) ~[netty-handler-4.1.111.Final.jar:4.1.111.Final]
2024-07-02T17:04:47,718 - INFO  - [pulsar-io-19-4:ClientCnx] - [id: 0xd356fe3f, L:/127.0.0.1:62614 ! R:localhost/127.0.0.1:62594] Disconnected
2024-07-02T17:04:47,721 - WARN  - [pulsar-io-19-4:ConnectionPool] - [[id: 0xd356fe3f, L:/127.0.0.1:62614 ! R:localhost/127.0.0.1:62594]] Connection handshake failed: org.apache.pulsar.client.api.PulsarClientException: Connection already closed
2024-07-02T17:04:47,721 - ERROR - [pulsar-io-19-4:NamespaceService] - non-persistent://my-tenant/my-ns/test-token-non-persistent Failed to get partition metadata due to redirecting fails
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: Connection already closed
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
	at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$getPartitionedTopicMetadata$10(BinaryProtoLookupService.java:260) ~[classes/:?]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
	at org.apache.pulsar.client.impl.ConnectionPool.lambda$createConnection$12(ConnectionPool.java:293) ~[classes/:?]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162) ~[?:?]
	at org.apache.pulsar.client.impl.ClientCnx.channelInactive(ClientCnx.java:311) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:412) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:377) ~[netty-codec-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.handler.ssl.SslHandler.channelInactive(SslHandler.java:1172) ~[netty-handler-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:303) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:274) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1402) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:301) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:281) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:900) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:811) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:566) ~[netty-transport-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.111.Final.jar:4.1.111.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException: Connection already closed
	... 28 more

The reason is lookup always uses brokerUrl instead of brokerTlsUrl.

Modifications

  • If lookupdate has brokerUrlTls then use it.

Verifying this change

  • Add testNonPersistentTopic to cover it.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@shibd shibd added type/bug The PR fixed a bug or issue reported a bug ready-to-test release/3.3.1 release/3.0.6 labels Jul 2, 2024
@shibd shibd self-assigned this Jul 2, 2024
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jul 2, 2024
@nodece
Copy link
Member

nodece commented Jul 2, 2024

@shibd I need to confirm two questions:

Do you know if you only enabled TLS?
What is the value of brokerUrl?

Usually, the brokerUrl is pulsar://xxx:6650, and use this value to connect to the broker(TLS and non-TLS were enabled), it should work fine.

@shibd
Copy link
Member Author

shibd commented Jul 2, 2024

Usually, the brokerUrl is pulsar://xxx:6650, and use this value to connect to the broker(TLS and non-TLS were enabled), it should work fine

oh, I understand this sentence. Yes, your are right.

But, in current implementation.

If enable BrokerClientTls, will use brokerServiceUrlTls to create PulsarClient.

boolean tlsEnabled = this.getConfiguration().isBrokerClientTlsEnabled();
conf.setServiceUrl(tlsEnabled ? this.brokerServiceUrlTls : this.brokerServiceUrl);

then, in here. It will use BinaryProtoLookupService to getParitionMetaData.

return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)
.thenApply(metadata -> true)

and will create a new BinaryProtoLookupService with incorrect param: (noTlsBrokerUrl, and enableTls)
IntelliJ IDEA 2024-07-02 21 31 50

and then, when create a new connection, into this logic, resulting in an error.

if (clientConfig.isUseTls()) {
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : physicalAddress))
.thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(ch ->
channelInitializerHandler.initializeClientCnx(ch, logicalAddress,
unresolvedPhysicalAddress))
.thenCompose(channel -> toCompletableFuture(channel.connect(physicalAddress)));

@shibd
Copy link
Member Author

shibd commented Jul 2, 2024

So, I think the root cause is we not should use internal method for PulsarClient on here.

return pulsarClient.getLookup(brokerUrl)
.getPartitionedTopicMetadata(topicName, false)

I think this fix can make sure use same connection type with PulsarClient configuration to avoid this issue.

In future, maybe we need refactor this logic.

/cc @poorbarcode

Copy link
Member

@nodece nodece left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation! LGTM

@poorbarcode
Copy link
Contributor

@shibd

I think this fix can make sure use same connection type with PulsarClient configuration to avoid this issue.
In future, maybe we need refactor this logic.

So far, the method checkNonPersistentNonPartitionedTopicExists has only used in two cases:

  • grant permission
  • get partitioned metatadata

There is retry mechanism on the client-side when calling pulsarClient.getPartitionedMetadata, and grant permission is a manually event, so it is fine

@codecov-commenter
Copy link

Codecov Report

Attention: Patch coverage is 83.33333% with 1 line in your changes missing coverage. Please review.

Project coverage is 73.45%. Comparing base (bbc6224) to head (8e47ed4).
Report is 432 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #22991      +/-   ##
============================================
- Coverage     73.57%   73.45%   -0.12%     
- Complexity    32624    33289     +665     
============================================
  Files          1877     1908      +31     
  Lines        139502   143001    +3499     
  Branches      15299    15589     +290     
============================================
+ Hits         102638   105041    +2403     
- Misses        28908    29921    +1013     
- Partials       7956     8039      +83     
Flag Coverage Δ
inttests 27.43% <50.00%> (+2.85%) ⬆️
systests 24.70% <0.00%> (+0.38%) ⬆️
unittests 72.49% <83.33%> (-0.36%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
...ache/pulsar/broker/namespace/NamespaceService.java 73.72% <83.33%> (+1.49%) ⬆️

... and 475 files with indirect coverage changes

@shibd shibd merged commit deb26f7 into apache:master Jul 3, 2024
51 checks passed
shibd added a commit that referenced this pull request Jul 3, 2024
shibd added a commit that referenced this pull request Jul 3, 2024
nikhil-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 10, 2024
…client tls (apache#22991)

(cherry picked from commit deb26f7)
(cherry picked from commit 998bd90)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Jul 15, 2024
…client tls (apache#22991)

(cherry picked from commit deb26f7)
(cherry picked from commit 998bd90)
@lhotari lhotari added this to the 4.0.0 milestone Oct 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants