Skip to content

Commit

Permalink
merge: #8444
Browse files Browse the repository at this point in the history
8444: [Backport stable/1.2] fix(polling): cancel scheduled timer on error r=oleschoenburg a=github-actions[bot]

# Description
Backport of #8424 to `stable/1.2`.

relates to #8423

Co-authored-by: Roman <[email protected]>
  • Loading branch information
zeebe-bors-cloud[bot] and romansmirnov authored Dec 20, 2021
2 parents 7577995 + 8f73350 commit 5e931ec
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ private void onCompleted(
.setMessage(errorMsg)
.build();

request.getResponseObserver().onError(StatusProto.toStatusException(status));
request.onError(StatusProto.toStatusException(status));
});
} else {
actor.submit(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ public void complete() {
if (isCompleted() || isCanceled()) {
return;
}
if (scheduledTimer != null) {
scheduledTimer.cancel();
}
cancelTimerIfScheduled();
try {
responseObserver.onCompleted();
} catch (final Exception e) {
Expand All @@ -92,7 +90,7 @@ public void onError(final Throwable error) {
if (isCompleted() || isCanceled()) {
return;
}

cancelTimerIfScheduled();
try {
responseObserver.onError(error);
} catch (final Exception e) {
Expand Down Expand Up @@ -151,4 +149,11 @@ public Duration getLongPollingTimeout(final Duration defaultTimeout) {
public boolean isLongPollingDisabled() {
return longPollingTimeout != null && longPollingTimeout.isNegative();
}

private void cancelTimerIfScheduled() {
if (hasScheduledTimer()) {
scheduledTimer.cancel();
scheduledTimer = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@

import io.camunda.zeebe.gateway.api.util.StubbedBrokerClient;
import io.camunda.zeebe.gateway.api.util.StubbedBrokerClient.RequestHandler;
import io.camunda.zeebe.gateway.cmd.BrokerRejectionException;
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerError;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerErrorResponse;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerRejection;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerRejectionResponse;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.impl.job.LongPollingActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.LongPollingActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -510,6 +515,87 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
assertThat(response.getJobsList()).hasSize(10);
}

@Test
public void shouldCancelTimerOnResourceExhausted() {
// given
final LongPollingActivateJobsRequest request = getLongPollingActivateJobsRequest();

brokerClient.registerHandler(
BrokerActivateJobsRequest.class,
new RequestHandler<BrokerActivateJobsRequest, BrokerResponse<?>>() {

private int count = 0;

/*
* First execution of the request (count < partitionCount) -> don't activate jobs
* Second execution of the request (count >= partitionCount) -> fail immediately
*/
@Override
public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
throws Exception {
if (count >= partitionsCount) {
return new BrokerErrorResponse<>(
new BrokerError(ErrorCode.RESOURCE_EXHAUSTED, "backpressure"));
}
count += 1;
return stub.handle(request);
}
});
// when
handler.activateJobs(request);
waitUntil(request::hasScheduledTimer);
brokerClient.notifyJobsAvailable(TYPE);

// then
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(request.getResponseObserver(), timeout(1000).times(1))
.onError(throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(StatusException.class);

assertThat(request.hasScheduledTimer()).isFalse();
}

@Test
public void shouldCancelTimerOnBrokerRejectionException() {
// given
final LongPollingActivateJobsRequest request = getLongPollingActivateJobsRequest();

brokerClient.registerHandler(
BrokerActivateJobsRequest.class,
new RequestHandler<BrokerActivateJobsRequest, BrokerResponse<?>>() {

private int count = 0;

/*
* First execution of the request (count < partitionCount) -> don't activate jobs
* Second execution of the request (count >= partitionCount) -> fail immediately
*/
@Override
public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
throws Exception {
if (count >= partitionsCount) {
return new BrokerRejectionResponse<>(
new BrokerRejection(
Intent.UNKNOWN, 1, RejectionType.INVALID_ARGUMENT, "expected"));
}
count += 1;
return stub.handle(request);
}
});
// when
handler.activateJobs(request);
waitUntil(request::hasScheduledTimer);
brokerClient.notifyJobsAvailable(TYPE);

// then
final ArgumentCaptor<Throwable> throwableCaptor = ArgumentCaptor.forClass(Throwable.class);
verify(request.getResponseObserver(), timeout(1000).times(1))
.onError(throwableCaptor.capture());
assertThat(throwableCaptor.getValue()).isInstanceOf(BrokerRejectionException.class);

assertThat(request.hasScheduledTimer()).isFalse();
}

private List<LongPollingActivateJobsRequest> activateJobsAndWaitUntilBlocked(final int amount) {
return IntStream.range(0, amount)
.boxed()
Expand Down

0 comments on commit 5e931ec

Please sign in to comment.