Skip to content

Commit

Permalink
refactor: extract response handling to separate method
Browse files Browse the repository at this point in the history
(cherry picked from commit 26f55b8)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent 52c9bba commit 389e330
Showing 1 changed file with 57 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import io.camunda.zeebe.gateway.impl.broker.RequestDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.RoundRobinDispatchStrategy;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.protocol.impl.record.value.job.JobBatchRecord;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.Map;
Expand Down Expand Up @@ -101,37 +103,8 @@ private void activateJobs(

brokerClient
.sendRequest(brokerRequest)
.whenComplete(
(brokerResponse, error) -> {
if (error == null) {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
delegate.onResponse(grpcResponse);
}

final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
final var shouldPollCurrentPartitionAgain = response.getTruncated();

requestState.setRemainingAmount(remainingJobsToActivate);
requestState.setPollPrevPartition(shouldPollCurrentPartitionAgain);
activateJobs(request, requestState, delegate);
} else {
final boolean wasResourceExhausted = wasResourceExhausted(error);
if (isRejection(error)) {
delegate.onError(error);
return;
} else if (!wasResourceExhausted) {
logErrorResponse(requestState.getCurrentPartition(), request.getType(), error);
}

requestState.setResourceExhaustedWasPresent(wasResourceExhausted);
requestState.setPollPrevPartition(false);
activateJobs(request, requestState, delegate);
}
});
.whenComplete(handleBrokerResponse(request, requestState, delegate));

} else {
// enough jobs activated or no more partitions left to check
final var remainingAmount = requestState.getRemainingAmount();
Expand All @@ -140,6 +113,59 @@ private void activateJobs(
}
}

private BiConsumer<BrokerResponse<JobBatchRecord>, Throwable> handleBrokerResponse(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate) {

return (brokerResponse, error) -> {
if (error == null) {
handleResponseSuccess(request, requestState, delegate, brokerResponse);
} else {
handleResponseError(request, requestState, delegate, error);
}
};
}

private void handleResponseSuccess(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState requestState,
final ResponseObserverDelegate delegate,
final BrokerResponse<JobBatchRecord> brokerResponse) {
final var response = brokerResponse.getResponse();
final ActivateJobsResponse grpcResponse =
ResponseMapper.toActivateJobsResponse(brokerResponse.getKey(), response);
final int jobsCount = grpcResponse.getJobsCount();
if (jobsCount > 0) {
delegate.onResponse(grpcResponse);
}

final var remainingJobsToActivate = requestState.getRemainingAmount() - jobsCount;
final var shouldPollCurrentPartitionAgain = response.getTruncated();

requestState.setRemainingAmount(remainingJobsToActivate);
requestState.setPollPrevPartition(shouldPollCurrentPartitionAgain);
activateJobs(request, requestState, delegate);
}

private void handleResponseError(
final InflightActivateJobsRequest request,
final InflightActivateJobsRequestState state,
final ResponseObserverDelegate delegate,
final Throwable error) {
final var wasResourceExhausted = wasResourceExhausted(error);
if (isRejection(error)) {
delegate.onError(error);
return;
} else if (!wasResourceExhausted) {
logErrorResponse(state.getCurrentPartition(), request.getType(), error);
}

state.setResourceExhaustedWasPresent(wasResourceExhausted);
state.setPollPrevPartition(false);
activateJobs(request, state, delegate);
}

private boolean isRejection(final Throwable error) {
return error != null && BrokerRejectionException.class.isAssignableFrom(error.getClass());
}
Expand Down

0 comments on commit 389e330

Please sign in to comment.