Skip to content

Commit

Permalink
fix(polling): respect request timeout settings
Browse files Browse the repository at this point in the history
(cherry picked from commit 3e31914)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Dec 20, 2021
1 parent 5e931ec commit ecb5367
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public void setFailedAttempts(final int failedAttempts) {
}
}

public boolean shouldAttempt(final int attemptThreshold) {
return failedAttempts < attemptThreshold;
}

public void resetFailedAttempts() {
setFailedAttempts(0);
}
Expand Down Expand Up @@ -112,10 +116,11 @@ public boolean hasActiveRequests() {

/**
* Returns whether the request should be repeated. A request should be repeated if the failed
* attempts were reset to 0 (because new jobs became available) whilst the request was running
* attempts were reset to 0 (because new jobs became available) whilst the request was running,
* and if the request's long polling is enabled.
*/
public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) {
return activeRequestsToBeRepeated.contains(request);
return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled();
}

public boolean shouldNotifyAndStartNotification() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,45 @@ public void activateJobs(
activateJobs(longPollingRequest);
}

protected void completeOrResubmitRequest(
final LongPollingActivateJobsRequest request, final boolean activateImmediately) {
if (request.isLongPollingDisabled()) {
// request is not supposed to use the
// long polling capabilities -> just
// complete the request
request.complete();
return;
}

if (request.isTimedOut()) {
// already timed out, nothing to do here
return;
}

final var type = request.getType();
final var state = getJobTypeState(type);

if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}

if (activateImmediately) {
activateJobs(request);
} else {
enqueueRequest(state, request);
}
}

public void activateJobs(final LongPollingActivateJobsRequest request) {
actor.run(
() -> {
final InFlightLongPollingActivateJobsRequestsState state =
getJobTypeState(request.getType());

if (state.getFailedAttempts() < failedAttemptThreshold) {
if (state.shouldAttempt(failedAttemptThreshold)) {
activateJobsUnchecked(state, request);
} else {
completeOrEnqueueRequest(state, request);
completeOrResubmitRequest(request, false);
}
});
}
Expand Down Expand Up @@ -169,15 +198,9 @@ private void onCompleted(
actor.submit(
() -> {
state.incrementFailedAttempts(currentTimeMillis());

final boolean shouldBeRepeated = state.shouldBeRepeated(request);
state.removeActiveRequest(request);

if (shouldBeRepeated) {
activateJobs(request);
} else {
completeOrEnqueueRequest(getJobTypeState(request.getType()), request);
}
completeOrResubmitRequest(request, shouldBeRepeated);
});
}
} else {
Expand Down Expand Up @@ -220,26 +243,17 @@ private void resetFailedAttemptsAndHandlePendingRequests(final String jobType) {
}
}

private void completeOrEnqueueRequest(
private void enqueueRequest(
final InFlightLongPollingActivateJobsRequestsState state,
final LongPollingActivateJobsRequest request) {
if (request.isLongPollingDisabled()) {
request.complete();
return;
}
if (!request.isTimedOut()) {
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
if (!request.hasScheduledTimer()) {
addTimeOut(state, request);
}
}
LOG.trace(
"Worker '{}' asked for '{}' jobs of type '{}', but none are available. This request will"
+ " be kept open until a new job of this type is created or until timeout of '{}'.",
request.getWorker(),
request.getMaxJobsToActivate(),
request.getType(),
request.getLongPollingTimeout(longPollingTimeout));
state.enqueueRequest(request);
}

private void addTimeOut(
Expand All @@ -251,8 +265,8 @@ private void addTimeOut(
actor.runDelayed(
requestTimeout,
() -> {
state.removeRequest(request);
request.timeout();
state.removeRequest(request);
});
request.setScheduledTimer(timeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.camunda.zeebe.test.util.TestUtil.waitUntil;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand Down Expand Up @@ -43,6 +44,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
Expand Down Expand Up @@ -596,6 +598,57 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
assertThat(request.hasScheduledTimer()).isFalse();
}

@Test
public void shouldCompleteRequestImmediatellyDespiteNotification() throws Exception {
// given
final LongPollingActivateJobsRequest request =
new LongPollingActivateJobsRequest(
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setRequestTimeout(-1)
.setMaxJobsToActivate(1)
.build(),
spy(ServerStreamObserver.class));

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);

// then
waitUntil(request::isCompleted);
verify(stub, times(partitionsCount)).handle(any());
}

@Test
public void shouldTimeOutRequestDespiteMultipleNotificationLoops() throws Exception {
// given
final var request = getLongPollingActivateJobsRequest();

registerCustomHandlerWithNotification(
(r) -> {
final var partitionId = r.getPartitionId();
if (partitionId == 1) {
brokerClient.notifyJobsAvailable(TYPE);
}
});

// when
handler.activateJobs(request);
waitUntil(request::hasScheduledTimer);
actorClock.addTime(Duration.ofMillis(LONG_POLLING_TIMEOUT));
waitUntil(request::isTimedOut);

// then
verify(stub, atLeast(partitionsCount)).handle(any());
}

private List<LongPollingActivateJobsRequest> activateJobsAndWaitUntilBlocked(final int amount) {
return IntStream.range(0, amount)
.boxed()
Expand Down Expand Up @@ -624,4 +677,19 @@ private LongPollingActivateJobsRequest getLongPollingActivateJobsRequest(final S

return new LongPollingActivateJobsRequest(request, responseSpy);
}

private void registerCustomHandlerWithNotification(
final Consumer<BrokerActivateJobsRequest> notification) {
brokerClient.registerHandler(
BrokerActivateJobsRequest.class,
new RequestHandler<BrokerActivateJobsRequest, BrokerResponse<?>>() {

@Override
public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
throws Exception {
notification.accept(request);
return stub.handle(request);
}
});
}
}

0 comments on commit ecb5367

Please sign in to comment.