Skip to content

Commit

Permalink
[Service Bus] Allow 0 prefetch and dynamically use batch size to requ…
Browse files Browse the repository at this point in the history
…est link credits (#17546)
  • Loading branch information
YijunXieMS authored Nov 20, 2020
1 parent b7bba68 commit ad184d3
Show file tree
Hide file tree
Showing 14 changed files with 146 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface AmqpReceiveLink extends AmqpLink {
Flux<Message> receive();

/**
* Adds the specified number of credits to the link.
* Schedule to adds the specified number of credits to the link.
*
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
* allow the receiver to receive {@code credits} more deliveries.
Expand All @@ -34,6 +34,21 @@ public interface AmqpReceiveLink extends AmqpLink {
*/
void addCredits(int credits);

/**
* Adds the specified number of credits to the link.
*
* The number of link credits initialises to zero. It is the application's responsibility to call this method to
* allow the receiver to receive {@code credits} more deliveries.
*
* It will update the credits in local memory instantly so {@link #getCredits()} will get
* the updated credits immediately. But the service side may get the credits added with a latency.
* As a contrast, {@link #getCredits()} may return an unchanged value for a short while after
* {@link #addCredits(int)} is called to schedule the credit addition and before the job dispatcher executes it.
*
* @param credits Number of credits to add to the receive link.
*/
void addCreditsInstantly(int credits);

/**
* Gets the current number of credits this link has.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public void addCredits(int credits) {
}
}

@Override
public void addCreditsInstantly(int credits) {
receiver.flow(credits);
}

@Override
public int getCredits() {
return receiver.getRemoteCredit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ class ServiceBusAsyncConsumer implements AutoCloseable {
this.linkProcessor = linkProcessor;
this.messageSerializer = messageSerializer;
this.processor = linkProcessor
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class))
.publish(receiverOptions.getPrefetchCount())
.autoConnect(1);
.map(message -> this.messageSerializer.deserialize(message, ServiceBusReceivedMessage.class));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class ServiceBusClientBuilder {

// Using 0 pre-fetch count for both receive modes, to avoid message lock lost exceptions in application
// receiving messages at a slow rate. Applications can set it to a higher value if they need better performance.
private static final int DEFAULT_PREFETCH_COUNT = 1;
private static final int DEFAULT_PREFETCH_COUNT = 0;
private static final String NAME_KEY = "name";
private static final String VERSION_KEY = "version";
private static final String UNKNOWN = "UNKNOWN";
Expand Down Expand Up @@ -671,11 +671,13 @@ public ServiceBusSessionProcessorClientBuilder maxConcurrentSessions(int maxConc

/**
* Sets the prefetch count of the processor. For both {@link ServiceBusReceiveMode#PEEK_LOCK PEEK_LOCK} and
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 1.
* {@link ServiceBusReceiveMode#RECEIVE_AND_DELETE RECEIVE_AND_DELETE} modes the default value is 0.
*
* Prefetch speeds up the message flow by aiming to have a message readily available for local retrieval when
* and before the application starts the processor.
* Setting a non-zero value will prefetch that number of messages. Setting the value to zero turns prefetch off.
* Using a non-zero prefetch risks of losing messages even though it has better performance.
* @see <a href="https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-prefetch">Service Bus Prefetch</a>
*
* @param prefetchCount The prefetch count.
*
Expand Down Expand Up @@ -1446,9 +1448,9 @@ ServiceBusReceiverAsyncClient buildAsyncClient(boolean isAutoCompleteAllowed) {
}

private void validateAndThrow(int prefetchCount) {
if (prefetchCount < 1) {
if (prefetchCount < 0) {
throw logger.logExceptionAsError(new IllegalArgumentException(String.format(
"prefetchCount (%s) cannot be less than 1.", prefetchCount)));
"prefetchCount (%s) cannot be less than 0.", prefetchCount)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,18 @@ Flux<ServiceBusReceivedMessage> peekMessagesAt(int maxMessages, long sequenceNum
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
*/
public Flux<ServiceBusReceivedMessage> receiveMessages() {
return receiveMessagesWithContext()
// Without limitRate(), if the user calls receiveMessages().subscribe(), it will call
// ServiceBusReceiveLinkProcessor.request(long request) where request = Long.MAX_VALUE.
// We turn this one-time non-backpressure request to continuous requests with backpressure.
// If receiverOptions.prefetchCount is set to non-zero, it will be passed to ServiceBusReceiveLinkProcessor
// to auto-refill the prefetch buffer. A request will retrieve one message from this buffer.
// If receiverOptions.prefetchCount is 0 (default value),
// the request will add a link credit so one message is retrieved from the service.
return receiveMessagesNoBackPressure().limitRate(1, 0);
}

Flux<ServiceBusReceivedMessage> receiveMessagesNoBackPressure() {
return receiveMessagesWithContext(0)
.handle((serviceBusMessageContext, sink) -> {
if (serviceBusMessageContext.hasError()) {
sink.error(serviceBusMessageContext.getThrowable());
Expand All @@ -598,6 +609,10 @@ public Flux<ServiceBusReceivedMessage> receiveMessages() {
* @return An <b>infinite</b> stream of messages from the Service Bus entity.
*/
Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
return receiveMessagesWithContext(1);
}

Flux<ServiceBusMessageContext> receiveMessagesWithContext(int highTide) {
final Flux<ServiceBusMessageContext> messageFlux = sessionManager != null
? sessionManager.receive()
: getOrCreateConsumer().receive().map(ServiceBusMessageContext::new);
Expand All @@ -610,16 +625,19 @@ Flux<ServiceBusMessageContext> receiveMessagesWithContext() {
withAutoLockRenewal = messageFlux;
}

final Flux<ServiceBusMessageContext> withAutoComplete;
Flux<ServiceBusMessageContext> result;
if (receiverOptions.isEnableAutoComplete()) {
withAutoComplete = new FluxAutoComplete(withAutoLockRenewal, completionLock,
result = new FluxAutoComplete(withAutoLockRenewal, completionLock,
context -> context.getMessage() != null ? complete(context.getMessage()) : Mono.empty(),
context -> context.getMessage() != null ? abandon(context.getMessage()) : Mono.empty());
} else {
withAutoComplete = withAutoLockRenewal;
result = withAutoLockRenewal;
}

return withAutoComplete
if (highTide > 0) {
result = result.limitRate(highTide, 0);
}
return result
.onErrorMap(throwable -> mapError(throwable, ServiceBusErrorSource.RECEIVE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,12 +575,12 @@ public void rollbackTransaction(ServiceBusTransactionContext transactionContext)
*/
@Override
public void close() {
asyncClient.close();

SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.getAndSet(null);
if (messageSubscriber != null && !messageSubscriber.isDisposed()) {
messageSubscriber.dispose();
}

asyncClient.close();
}

/**
Expand All @@ -590,19 +590,20 @@ public void close() {
private void queueWork(int maximumMessageCount, Duration maxWaitTime,
FluxSink<ServiceBusReceivedMessage> emitter) {
final long id = idGenerator.getAndIncrement();
final SynchronousReceiveWork work = new SynchronousReceiveWork(id, maximumMessageCount, maxWaitTime, emitter);

final int prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
final int toRequest = prefetch != 0 ? Math.min(maximumMessageCount, prefetch) : maximumMessageCount;
final SynchronousReceiveWork work = new SynchronousReceiveWork(id,
toRequest,
maxWaitTime, emitter);
SynchronousMessageSubscriber messageSubscriber = synchronousMessageSubscriber.get();
if (messageSubscriber == null) {
long prefetch = asyncClient.getReceiverOptions().getPrefetchCount();
SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(prefetch, work);

SynchronousMessageSubscriber newSubscriber = new SynchronousMessageSubscriber(toRequest, work);
if (!synchronousMessageSubscriber.compareAndSet(null, newSubscriber)) {
newSubscriber.dispose();
SynchronousMessageSubscriber existing = synchronousMessageSubscriber.get();
existing.queueWork(work);
} else {
asyncClient.receiveMessages().subscribeWith(newSubscriber);
asyncClient.receiveMessagesNoBackPressure().subscribeWith(newSubscriber);
}
} else {
messageSubscriber.queueWork(work);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private Flux<ServiceBusMessageContext> getSession(Scheduler scheduler, boolean d
onSessionRequest(1L);
}
}))
.publishOn(scheduler);
.publishOn(scheduler, 1);
}

private Mono<ServiceBusManagementNode> getManagementNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,23 @@ class ServiceBusSessionReceiver implements AutoCloseable {
this.receiveLink = receiveLink;
this.lockContainer = new LockContainer<>(ServiceBusConstants.OPERATION_TIMEOUT);

receiveLink.setEmptyCreditListener(() -> 1);
receiveLink.setEmptyCreditListener(() -> 0);

final Flux<ServiceBusMessageContext> receivedMessagesFlux = receiveLink
.receive()
.publishOn(scheduler)
.doOnSubscribe(subscription -> {
logger.verbose("Adding prefetch to receive link.");
receiveLink.addCredits(prefetch);
if (prefetch > 0) {
receiveLink.addCredits(prefetch);
}
})
.doOnRequest(request -> { // request is of type long.
if (prefetch == 0) { // add "request" number of credits
receiveLink.addCredits((int) request);
} else { // keep total credits "prefetch" if prefetch is not 0.
receiveLink.addCredits(Math.max(0, prefetch - receiveLink.getCredits()));
}
})
.takeUntilOther(cancelReceiveProcessor)
.map(message -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ protected void hookOnSubscribe(Subscription subscription) {

if (Operators.setOnce(UPSTREAM, this, subscription)) {
this.subscription = subscription;
remaining.addAndGet(requested);
subscription.request(requested);
subscriberInitialized = true;
drain();
} else {
Expand Down Expand Up @@ -140,7 +138,6 @@ private void drainQueue() {

while ((currentWork = workQueue.peek()) != null
&& (!currentWork.isProcessingStarted() || bufferMessages.size() > 0)) {

// Additional check for safety, but normally this work should never be terminal
if (currentWork.isTerminal()) {
// This work already finished by either timeout or no more messages to send, process next work.
Expand All @@ -155,6 +152,9 @@ private void drainQueue() {
// timer to complete the currentWork in case of timeout trigger
currentTimeoutOperation = getTimeoutOperation(currentWork);
currentWork.startedProcessing();
final long calculatedRequest = currentWork.getNumberOfEvents() - remaining.get();
remaining.addAndGet(calculatedRequest);
subscription.request(calculatedRequest);
}

// Send messages to currentWork from buffer
Expand All @@ -174,15 +174,6 @@ private void drainQueue() {
currentTimeoutOperation.dispose();
}
logger.verbose("The work [{}] is complete.", currentWork.getId());
} else {
// Since this work is not complete, find out how much we should request from upstream
long creditToAdd = currentWork.getRemaining() - (remaining.get() + bufferMessages.size());
if (creditToAdd > 0) {
remaining.addAndGet(creditToAdd);
subscription.request(creditToAdd);
logger.verbose("Requesting [{}] from upstream for work [{}].", creditToAdd,
currentWork.getId());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class ServiceBusReceiveLinkProcessor extends FluxProcessor<ServiceBusRece
private final Object queueLock = new Object();
private final AtomicBoolean isTerminated = new AtomicBoolean();
private final AtomicInteger retryAttempts = new AtomicInteger();
private final AtomicBoolean linkCreditsAdded = new AtomicBoolean();
private final AtomicReference<String> linkName = new AtomicReference<>();

// Queue containing all the prefetched messages.
Expand Down Expand Up @@ -200,12 +199,7 @@ public void onNext(ServiceBusReceiveLink next) {
oldSubscription = currentLinkSubscriptions;

currentLink = next;
next.setEmptyCreditListener(() -> {
final int creditsToAdd = getCreditsToAdd(0);
linkCreditsAdded.set(creditsToAdd > 0);

return creditsToAdd;
});
next.setEmptyCreditListener(() -> 0);

currentLinkSubscriptions = Disposables.composite(
next.receive().publishOn(Schedulers.boundedElastic()).subscribe(message -> {
Expand Down Expand Up @@ -499,6 +493,9 @@ private void drainQueue() {
if (receiveMode != ServiceBusReceiveMode.PEEK_LOCK) {
pendingMessages.decrementAndGet();
}
if (prefetch > 0) { // re-fill messageQueue if there is prefetch configured.
checkAndAddCredits(currentLink);
}
} catch (Exception e) {
logger.error("Exception occurred while handling downstream onNext operation.", e);
throw logger.logExceptionAsError(Exceptions.propagate(
Expand Down Expand Up @@ -545,18 +542,14 @@ private void checkAndAddCredits(AmqpReceiveLink link) {
return;
}

// Credits have already been added to the link. We won't try again.
if (linkCreditsAdded.getAndSet(true)) {
return;
}

final int credits = getCreditsToAdd(link.getCredits());
linkCreditsAdded.set(credits > 0);

logger.info("Link credits to add. Credits: '{}'", credits);
synchronized (lock) {
final int linkCredits = link.getCredits();
final int credits = getCreditsToAdd(linkCredits);
logger.info("Link credits='{}', Link credits to add: '{}'", linkCredits, credits);

if (credits > 0) {
link.addCredits(credits);
if (credits > 0) {
link.addCredits(credits);
}
}
}

Expand All @@ -571,22 +564,40 @@ private int getCreditsToAdd(int linkCredits) {
}

final int creditsToAdd;
if (messageQueue.isEmpty() && !hasBackpressure) {
creditsToAdd = prefetch;
final int expectedTotalCredit;
if (prefetch == 0) {
if (r <= Integer.MAX_VALUE) {
expectedTotalCredit = (int) r;
} else {
//This won't really happen in reality.
//For async client, receiveMessages() calls "return receiveMessagesNoBackPressure().limitRate(1, 0);".
//So it will request one by one from this link processor, even though the user's request has no
//back pressure.
//For sync client, the sync subscriber has back pressure.
//The request count uses the the argument of method receiveMessages(int maxMessages).
//It's at most Integer.MAX_VALUE.
expectedTotalCredit = Integer.MAX_VALUE;
}
} else {
synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(Long.valueOf(r).intValue() - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(prefetch - pending, 1)
: 0;
}
expectedTotalCredit = prefetch;
}
logger.info("linkCredits: '{}', expectedTotalCredit: '{}'", linkCredits, expectedTotalCredit);

synchronized (queueLock) {
final int queuedMessages = pendingMessages.get();
final int pending = queuedMessages + linkCredits;

if (hasBackpressure) {
creditsToAdd = Math.max(expectedTotalCredit - pending, 0);
} else {
// If the queue has less than 1/3 of the prefetch, then add the difference to keep the queue full.
creditsToAdd = minimumNumberOfMessages >= queuedMessages
? Math.max(expectedTotalCredit - pending, 0)
: 0;
}
logger.info("prefetch: '{}', requested: '{}', linkCredits: '{}', expectedTotalCredit: '{}', queuedMessages:"
+ "'{}', creditsToAdd: '{}', messageQueue.size(): '{}'", getPrefetch(), r, linkCredits,
expectedTotalCredit, queuedMessages, creditsToAdd, messageQueue.size());
}

return creditsToAdd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void invalidPrefetch() {
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK);

// Act & Assert
assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(0));
assertThrows(IllegalArgumentException.class, () -> receiverBuilder.prefetchCount(-1));
}

@MethodSource("getProxyConfigurations")
Expand Down
Loading

0 comments on commit ad184d3

Please sign in to comment.