Skip to content

Commit

Permalink
fix: stop activating jobs if activated jobs could not be sent back
Browse files Browse the repository at this point in the history
(cherry picked from commit fea7b1e)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent a512804 commit a73e1c5
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.ScheduledTimer;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -83,14 +84,18 @@ public boolean isCompleted() {
return isCompleted;
}

public void onResponse(final ActivateJobsResponse grpcResponse) {
public Either<Exception, Boolean> tryToSendActivatedJobs(
final ActivateJobsResponse grpcResponse) {
if (isOpen()) {
try {
responseObserver.onNext(grpcResponse);
return Either.right(true);
} catch (final Exception e) {
LOG.warn("Failed to send response to client.", e);
return Either.left(e);
}
}
return Either.right(false);
}

public void onError(final Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ private void activateJobsUnchecked(
activateJobsHandler.activateJobs(
partitionsCount,
request,
response -> onResponse(request, response),
error -> onError(state, request, error),
(remainingAmount, containedResourceExhaustedResponse) ->
onCompleted(state, request, remainingAmount, containedResourceExhaustedResponse));
Expand Down Expand Up @@ -219,11 +218,6 @@ private void onCompleted(
}
}

private void onResponse(
final InflightActivateJobsRequest request, final ActivateJobsResponse activateJobsResponse) {
actor.submit(() -> request.onResponse(activateJobsResponse));
}

private void onError(
final InFlightLongPollingActivateJobsRequestsState state,
final InflightActivateJobsRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import static io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler.toInflightActivateJobsRequest;

import com.google.rpc.Code;
import com.google.rpc.Status;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.ResponseMapper;
import io.camunda.zeebe.gateway.cmd.BrokerErrorException;
Expand All @@ -24,7 +26,9 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.ActorControl;
import io.grpc.protobuf.StatusProto;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
Expand All @@ -37,6 +41,10 @@
public final class RoundRobinActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {

private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON =
ACTIVATE_JOB_NOT_SENT_MSG + ", failed with: %s";

private final Map<String, RequestDispatchStrategy> jobTypeToNextPartitionId =
new ConcurrentHashMap<>();
private final BrokerClient brokerClient;
Expand Down Expand Up @@ -64,7 +72,6 @@ public void activateJobs(
activateJobs(
topology.getPartitionsCount(),
inflightRequest,
responseObserver::onNext,
responseObserver::onError,
(remainingAmount, resourceExhaustedWasPresent) -> responseObserver.onCompleted());
}
Expand All @@ -73,7 +80,6 @@ public void activateJobs(
public void activateJobs(
final int partitionsCount,
final InflightActivateJobsRequest request,
final Consumer<ActivateJobsResponse> onResponse,
final Consumer<Throwable> onError,
final BiConsumer<Integer, Boolean> onCompleted) {
final var jobType = request.getType();
Expand All @@ -82,7 +88,7 @@ public void activateJobs(

final var requestState =
new InflightActivateJobsRequestState(partitionIterator, maxJobsToActivate);
final var delegate = new ResponseObserverDelegate(onResponse, onError, onCompleted);
final var delegate = new ResponseObserverDelegate(onError, onCompleted);

activateJobs(request, requestState, delegate);
}
Expand Down Expand Up @@ -142,9 +148,22 @@ private void handleResponseSuccess(
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
delegate.onResponse(grpcResponse);

final var jobsCount = grpcResponse.getJobsCount();
final var jobsActivated = jobsCount > 0;

if (jobsActivated) {
final var result = request.tryToSendActivatedJobs(grpcResponse);
final var responseWasSent = result.isRight() && result.get();

if (!responseWasSent) {
final var reason = createReasonMessage(result);
final var jobType = request.getType();

logResponseNotSent(jobType, reason);
cancelActivateJobsRequest(reason, delegate);
return;
}
}

final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
Expand All @@ -156,6 +175,23 @@ private void handleResponseSuccess(
});
}

private String createReasonMessage(final Either<Exception, Boolean> resultValue) {
final String errorMessage;
if (resultValue.isLeft()) {
final var exception = resultValue.getLeft();
errorMessage = String.format(ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON, exception.getMessage());
} else {
errorMessage = ACTIVATE_JOB_NOT_SENT_MSG;
}
return errorMessage;
}

private void cancelActivateJobsRequest(
final String reason, final ResponseObserverDelegate delegate) {
final var status = Status.newBuilder().setCode(Code.CANCELLED_VALUE).setMessage(reason).build();
delegate.onError(StatusProto.toStatusException(status));
}

private void handleResponseError(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState state,
Expand Down Expand Up @@ -195,6 +231,11 @@ private void logErrorResponse(final int partition, final String jobType, final T
"Failed to activate jobs for type {} from partition {}", jobType, partition, error);
}

private void logResponseNotSent(final String jobType, final String reason) {
Loggers.GATEWAY_LOGGER.debug(
"Failed to send back activated jobs for type {}, because: {}", jobType, reason);
}

private PartitionIdIterator partitionIdIteratorForType(
final String jobType, final int partitionsCount) {
final RequestDispatchStrategy nextPartitionSupplier =
Expand All @@ -206,23 +247,16 @@ private PartitionIdIterator partitionIdIteratorForType(

private static final class ResponseObserverDelegate {

private final Consumer<ActivateJobsResponse> onResponseDelegate;
private final Consumer<Throwable> onErrorDelegate;
private final BiConsumer<Integer, Boolean> onCompletedDelegate;

private ResponseObserverDelegate(
final Consumer<ActivateJobsResponse> onResponseDelegate,
final Consumer<Throwable> onErrorDelegate,
final BiConsumer<Integer, Boolean> onCompletedDelegate) {
this.onResponseDelegate = onResponseDelegate;
this.onErrorDelegate = onErrorDelegate;
this.onCompletedDelegate = onCompletedDelegate;
}

public void onResponse(final ActivateJobsResponse response) {
onResponseDelegate.accept(response);
}

public void onError(final Throwable t) {
onErrorDelegate.accept(t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
Expand All @@ -35,6 +37,7 @@
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.protocol.record.RejectionType;
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
Expand Down Expand Up @@ -707,6 +710,56 @@ public void shouldTimeOutRequestDespiteMultipleNotificationLoops() throws Except
verify(stub, atLeast(partitionsCount)).handle(any());
}

@Test
public void shouldNotContinueWithNextPartitionIfResponseIsNotSend() throws Exception {
// given
final var request =
spy(
new InflightActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(3 * MAX_JOBS_TO_ACTIVATE)
.setRequestTimeout(500)
.build(),
spy(ServerStreamObserver.class)));

stub.addAvailableJobs(TYPE, MAX_JOBS_TO_ACTIVATE);
doReturn(Either.right(false)).when(request).tryToSendActivatedJobs(any());

// when
handler.activateJobs(request);
waitUntil(request::isAborted);

// then
verify(stub, times(1)).handle(request.getRequest());
}

@Test
public void shouldNotContinueWithNextPartitionIfResponseFailed() throws Exception {
// given
final var request =
new InflightActivateJobsRequest(
getNextRequestId(),
ActivateJobsRequest.newBuilder()
.setType(TYPE)
.setMaxJobsToActivate(3 * MAX_JOBS_TO_ACTIVATE)
.setRequestTimeout(500)
.build(),
spy(ServerStreamObserver.class));

stub.addAvailableJobs(TYPE, MAX_JOBS_TO_ACTIVATE);
final var responseObserver = request.getResponseObserver();
doThrow(new RuntimeException("foo")).when(responseObserver).onNext(any());

// when
handler.activateJobs(request);
waitUntil(request::isAborted);

// then
verify(stub, times(1)).handle(request.getRequest());
}

private List<InflightActivateJobsRequest> activateJobsAndWaitUntilBlocked(final int amount) {
return IntStream.range(0, amount)
.boxed()
Expand Down

0 comments on commit a73e1c5

Please sign in to comment.