Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker][branch-3.1] Avoid PublishRateLimiter use an already closed RateLimiter #22011

Merged
merged 2 commits into from
Feb 19, 2024

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Feb 1, 2024

Motivation

We found the following error logs on the broker when we used ResourceGroupPublishLimiter. This root cause is tryAcquire method has a race condition with the replaceLimiters method, leading to publishRateLimiter using an already closed RateLimiter. PrecisePublishLimiter also has the same issue.

[pulsar-io-4-54] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.6:59173] Got exception java.lang.IllegalArgumentException: Rate limiter is already shutdown
 at com.google.common.base.Preconditions.checkArgument(Preconditions.java:145)
 at org.apache.pulsar.common.util.RateLimiter.tryAcquire(RateLimiter.java:176)
 at org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter.tryAcquire(ResourceGroupPublishLimiter.java:138)
 at org.apache.pulsar.broker.service.AbstractTopic.isResourceGroupPublishRateExceeded(AbstractTopic.java:1036)
 at org.apache.pulsar.broker.service.ServerCnx.startSendOperation(ServerCnx.java:2611)
 at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1508)
 at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:207)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
 at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
 at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
 at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:454)
 at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
 at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
 at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
 at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
 at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)
 at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499)
 at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397)
 at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
 at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
 at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
 at java.base/java.lang.Thread.run(Thread.java:829)

Modifications

If the current RateLimiter is already shutdown, we only return true and print the info log. Due to pip-322 refactor pulsar rate limiting on version 3.2, so we only need to fix versions before 3.2.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Feb 1, 2024
@frankjkelly
Copy link
Contributor

frankjkelly commented Feb 1, 2024

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working?
Are producers impacted by this successfully producing with or without the rate limit? please advise.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's risky to add yet another synchronized method. We have seen these introduce performance regressions and deadlocks in the past. For the shutdown case, an easy solution would be to add a explicit RuntimeException class for the shutdown and simply catch it if it occurs. That would be a low risk change.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@coderzc A better fix is to remove the exception that is thrown when the rate limiter is closed.
Simply return if it is closed. This could be logged at info level.

checkArgument(!isClosed(), "Rate limiter is already shutdown");

@coderzc
Copy link
Member Author

coderzc commented Feb 5, 2024

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.

@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.

@coderzc
Copy link
Member Author

coderzc commented Feb 5, 2024

@coderzc A better fix is to remove the exception that is thrown when the rate limiter is closed. Simply return if it is closed. This could be logged at info level.

checkArgument(!isClosed(), "Rate limiter is already shutdown");

@lhotari Good idea, If the current RateLimiter is already shutdown, then we only return true and print the info log. PTAL~

@frankjkelly
Copy link
Contributor

Thanks @coderzc - does this mean that if we're getting this error that rate limiting is or is not working? Are producers impacted by this successfully producing with or without the rate limit? please advise.

@frankjkelly When we get this error log, then send operation failed temporarily due to timeout. At this time, the rate limiter is being updated. When it is updated, it will continue to work normally.

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@coderzc
Copy link
Member Author

coderzc commented Feb 9, 2024

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

@frankjkelly
Copy link
Contributor

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

@lhotari
Copy link
Member

lhotari commented Feb 9, 2024

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.

It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).

If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.

@frankjkelly, did I answer your question?

@frankjkelly
Copy link
Contributor

Thanks for the clarification so does that mean the client will retry and if so is that within milliseconds, or seconds or something else?

@frankjkelly I think the client does not retry automatically and the user needs to resend the message manually if message sent fails.

Hmmm @merlimat or @lhotari can you confirm? If this error requires the caller to catch and retry (as opposed to the client doing it internally) then that's a concern for adoption of the rate limiter (if the error occurs and the client retries as best it can that's OK).

I don't see anything special about rate limiters in message delivery and retries. The Pulsar client is designed to continue attempting to send messages until a potential send timeout occurs. It's also possible to set up an unlimited send timeout, allowing the client to retry indefinitely. This feature is detailed in the Pulsar documentation, available at https://pulsar.apache.org/docs/3.1.x/cookbooks-deduplication/#pulsar-clients (it's explained in the context of message deduplication). You can refer to the Javadocs for sendTimeout on ProducerBuilder.

It's crucial for messaging applications to be equipped to handle potential failures in message delivery, especially when data consistency is a key concern. Once the Pulsar client has acknowledged the message as sent by returning the message id, the responsibility for maintaining and ensuring the delivery of the message shifts to Pulsar. It's also necessary to verify that the message id is returned when using the asynchronous API (sendAsync).

If sending results in an error or the messaging application never receives a message id from the Pulsar client, it's the messaging application's responsibility to retry.

@frankjkelly, did I answer your question?

You did - Thanks @lhotari

@lhotari lhotari merged commit 8cce14c into apache:branch-3.1 Feb 19, 2024
37 of 40 checks passed
coderzc added a commit that referenced this pull request Feb 19, 2024
coderzc added a commit that referenced this pull request Feb 19, 2024
coderzc added a commit that referenced this pull request Feb 19, 2024
nodece pushed a commit to nodece/pulsar that referenced this pull request Feb 23, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 1, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants