-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Service Bus] Allow 0 prefetch and dynamically use batch size to request link credits #17546
Changes from 7 commits
fd1843a
deadef5
945f1c8
98d07dd
ef52f71
36c4302
1a5c4c4
d43457e
34c70ce
e7aa8bc
152e193
65cdcf6
c4a14f0
41d08ab
124baab
5743dcb
8986b2f
39bac24
5572e15
530e258
aee463c
bafc0e4
037cbbc
3774d66
09fca43
fe7ee04
59aee7b
8ea5ba6
24e3421
de311a8
29085d3
4c837b5
3c23542
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable { | |
this.linkProcessor = linkProcessor; | ||
this.messageSerializer = messageSerializer; | ||
this.processor = linkProcessor | ||
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)) | ||
.publish(receiverOptions.getPrefetchCount()) | ||
.autoConnect(1); | ||
Comment on lines
-38
to
-39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why was this removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is removed because the publish( ) method requests the upstream (link processor) with its own request count (the reactor prefetch). With publish(), the Updated to still remove publish() from this place and updated the async client's receiveMessages() to publish and autoConnect. So the user can subscribe it more than once. For |
||
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class)); | ||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of the calls into Proton-j Reactor is done via ReactorDispatcher because of thread safety,
As commented here ..
https://github.com/Azure/azure-sdk-for-java/blob/master/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorDispatcher.java#L22
Would this be okay to call this api directly ? @srnagar Would this be okay here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be tested for thread-safety. If proton-j doesn't support adding credits in a thread-safe manner we might add incorrect number of credits to the link and can potentially result in data loss if the SB mode is RECEIVE_AND_DELETE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I simply put it into a synchronized block. The performance overhead should be minimal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed back to addCredits()