Skip to content

Commit

Permalink
merge: #8959
Browse files Browse the repository at this point in the history
8959: [Backport stable/1.3] #3631: Re-activate jobs r=romansmirnov a=github-actions[bot]

# Description
Backport of #8879 to `stable/1.3`.

relates to #3631

Co-authored-by: Roman <[email protected]>
  • Loading branch information
zeebe-bors-camunda[bot] and romansmirnov authored Mar 23, 2022
2 parents 86564c7 + 78ac46a commit 4a9e051
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 264 deletions.
34 changes: 25 additions & 9 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.grpc.BindableService;
import io.grpc.Server;
Expand All @@ -38,7 +40,9 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
Expand Down Expand Up @@ -110,15 +114,8 @@ public void start() throws IOException {
healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

final ActivateJobsHandler activateJobsHandler;
if (gatewayCfg.getLongPolling().isEnabled()) {
final LongPollingActivateJobsHandler longPollingHandler =
buildLongPollingHandler(brokerClient);
actorSchedulingService.submitActor(longPollingHandler);
activateJobsHandler = longPollingHandler;
} else {
activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
}
final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);

final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
Expand Down Expand Up @@ -189,6 +186,25 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
if (gatewayCfg.getLongPolling().isEnabled()) {
return buildLongPollingHandler(brokerClient);
} else {
return new RoundRobinActivateJobsHandler(brokerClient);
}
}

private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import java.util.concurrent.atomic.AtomicLong;

/** Can handle an 'activate jobs' request from a client. */
public interface ActivateJobsHandler {

static final AtomicLong ACTIVATE_JOBS_REQUEST_ID_GENERATOR = new AtomicLong(1);

/**
* Handle activate jobs request from a client
*
Expand All @@ -22,4 +25,11 @@ public interface ActivateJobsHandler {
*/
void activateJobs(
ActivateJobsRequest request, ServerStreamObserver<ActivateJobsResponse> responseObserver);

public static InflightActivateJobsRequest toInflightActivateJobsRequest(
final ActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
return new InflightActivateJobsRequest(
ACTIVATE_JOBS_REQUEST_ID_GENERATOR.getAndIncrement(), request, responseObserver);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ public final class InFlightLongPollingActivateJobsRequestsState {

private final String jobType;
private final LongPollingMetrics metrics;
private final Queue<LongPollingActivateJobsRequest> activeRequests = new LinkedList<>();
private final Queue<LongPollingActivateJobsRequest> pendingRequests = new LinkedList<>();
private final Set<LongPollingActivateJobsRequest> activeRequestsToBeRepeated = new HashSet<>();
private final Queue<InflightActivateJobsRequest> activeRequests = new LinkedList<>();
private final Queue<InflightActivateJobsRequest> pendingRequests = new LinkedList<>();
private final Set<InflightActivateJobsRequest> activeRequestsToBeRepeated = new HashSet<>();
private int failedAttempts;
private long lastUpdatedTime;

Expand Down Expand Up @@ -60,14 +60,14 @@ public long getLastUpdatedTime() {
return lastUpdatedTime;
}

public void enqueueRequest(final LongPollingActivateJobsRequest request) {
public void enqueueRequest(final InflightActivateJobsRequest request) {
if (!pendingRequests.contains(request)) {
pendingRequests.offer(request);
}
removeObsoleteRequestsAndUpdateMetrics();
}

public Queue<LongPollingActivateJobsRequest> getPendingRequests() {
public Queue<InflightActivateJobsRequest> getPendingRequests() {
removeObsoleteRequestsAndUpdateMetrics();
return pendingRequests;
}
Expand All @@ -79,29 +79,32 @@ private void removeObsoleteRequestsAndUpdateMetrics() {
metrics.setBlockedRequestsCount(jobType, pendingRequests.size());
}

private boolean isObsolete(final LongPollingActivateJobsRequest request) {
return request.isTimedOut() || request.isCanceled() || request.isCompleted();
private boolean isObsolete(final InflightActivateJobsRequest request) {
return request.isTimedOut()
|| request.isCanceled()
|| request.isCompleted()
|| request.isAborted();
}

public void removeRequest(final LongPollingActivateJobsRequest request) {
public void removeRequest(final InflightActivateJobsRequest request) {
pendingRequests.remove(request);
removeObsoleteRequestsAndUpdateMetrics();
}

public LongPollingActivateJobsRequest getNextPendingRequest() {
public InflightActivateJobsRequest getNextPendingRequest() {
removeObsoleteRequestsAndUpdateMetrics();
final LongPollingActivateJobsRequest request = pendingRequests.poll();
final InflightActivateJobsRequest request = pendingRequests.poll();
metrics.setBlockedRequestsCount(jobType, pendingRequests.size());
return request;
}

public void addActiveRequest(final LongPollingActivateJobsRequest request) {
public void addActiveRequest(final InflightActivateJobsRequest request) {
activeRequests.offer(request);
pendingRequests.remove(request);
activeRequestsToBeRepeated.remove(request);
}

public void removeActiveRequest(final LongPollingActivateJobsRequest request) {
public void removeActiveRequest(final InflightActivateJobsRequest request) {
activeRequests.remove(request);
activeRequestsToBeRepeated.remove(request);
}
Expand All @@ -116,7 +119,7 @@ public boolean hasActiveRequests() {
* attempts were reset to 0 (because new jobs became available) whilst the request was running,
* and if the request's long polling is enabled.
*/
public boolean shouldBeRepeated(final LongPollingActivateJobsRequest request) {
public boolean shouldBeRepeated(final InflightActivateJobsRequest request) {
return activeRequestsToBeRepeated.contains(request) && !request.isLongPollingDisabled();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@
import io.camunda.zeebe.gateway.impl.broker.request.BrokerActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.ScheduledTimer;
import java.time.Duration;
import java.util.Objects;
import org.slf4j.Logger;

public final class LongPollingActivateJobsRequest {
public class InflightActivateJobsRequest {

private static final Logger LOG = Loggers.GATEWAY_LOGGER;
private final long requestId;
Expand All @@ -32,8 +33,9 @@ public final class LongPollingActivateJobsRequest {
private ScheduledTimer scheduledTimer;
private boolean isTimedOut;
private boolean isCompleted;
private boolean isAborted;

public LongPollingActivateJobsRequest(
public InflightActivateJobsRequest(
final long requestId,
final ActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver) {
Expand All @@ -47,7 +49,7 @@ public LongPollingActivateJobsRequest(
request.getRequestTimeout());
}

private LongPollingActivateJobsRequest(
private InflightActivateJobsRequest(
final long requestId,
final BrokerActivateJobsRequest request,
final ServerStreamObserver<ActivateJobsResponse> responseObserver,
Expand All @@ -66,7 +68,7 @@ private LongPollingActivateJobsRequest(
}

public void complete() {
if (isCompleted() || isCanceled()) {
if (!isOpen()) {
return;
}
cancelTimerIfScheduled();
Expand All @@ -82,26 +84,44 @@ public boolean isCompleted() {
return isCompleted;
}

public void onResponse(final ActivateJobsResponse grpcResponse) {
if (!(isCompleted() || isCanceled())) {
/**
* Sends activated jobs to the respective client.
*
* @param activatedJobs to send back to the client
* @return an instance of {@link Either} indicating the following:
* <ul>
* <li>{@link Either#get() == true}: if the activated jobs have been sent back to the client
* <li>{@link Either#get() == false}: if the activated jobs couldn't be sent back to the
* client
* <li>{@link Either#getLeft() != null}: if sending back the activated jobs failed with an
* exception (note: in this case {@link Either#isRight() == false})
* </ul>
*/
public Either<Exception, Boolean> tryToSendActivatedJobs(
final ActivateJobsResponse activatedJobs) {
if (isOpen()) {
try {
responseObserver.onNext(grpcResponse);
responseObserver.onNext(activatedJobs);
return Either.right(true);
} catch (final Exception e) {
LOG.warn("Failed to send response to client.", e);
return Either.left(e);
}
}
return Either.right(false);
}

public void onError(final Throwable error) {
if (isCompleted() || isCanceled()) {
if (!isOpen()) {
return;
}
cancelTimerIfScheduled();
try {
responseObserver.onError(error);
} catch (final Exception e) {
LOG.warn("Failed to send response to client.", e);
LOG.warn("Failed to send terminating error to client.", e);
}
isAborted = true;
}

public void timeout() {
Expand Down Expand Up @@ -163,6 +183,14 @@ private void cancelTimerIfScheduled() {
}
}

public boolean isAborted() {
return isAborted;
}

public boolean isOpen() {
return !(isCompleted() || isCanceled() || isAborted());
}

@Override
public int hashCode() {
return Objects.hash(jobType, maxJobsToActivate, requestId, worker);
Expand All @@ -179,7 +207,7 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass()) {
return false;
}
final var other = (LongPollingActivateJobsRequest) obj;
final var other = (InflightActivateJobsRequest) obj;
return Objects.equals(jobType, other.jobType)
&& maxJobsToActivate == other.maxJobsToActivate
&& requestId == other.requestId
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.gateway.impl.job;

import io.camunda.zeebe.gateway.impl.broker.PartitionIdIterator;

public class InflightActivateJobsRequestState {

private final PartitionIdIterator iterator;
private int remainingAmount;
private boolean pollPrevPartition;
private boolean resourceExhaustedWasPresent;

public InflightActivateJobsRequestState(
final PartitionIdIterator iterator, final int remainingAmount) {
this.iterator = iterator;
this.remainingAmount = remainingAmount;
}

private boolean hasNextPartition() {
return iterator.hasNext();
}

public int getCurrentPartition() {
return iterator.getCurrentPartitionId();
}

public int getNextPartition() {
return pollPrevPartition ? iterator.getCurrentPartitionId() : iterator.next();
}

public int getRemainingAmount() {
return remainingAmount;
}

public void setRemainingAmount(int remainingAmount) {
this.remainingAmount = remainingAmount;
}

public boolean wasResourceExhaustedPresent() {
return resourceExhaustedWasPresent;
}

public void setResourceExhaustedWasPresent(final boolean resourceExhaustedWasPresent) {
this.resourceExhaustedWasPresent = resourceExhaustedWasPresent;
}

public void setPollPrevPartition(boolean pollPrevPartition) {
this.pollPrevPartition = pollPrevPartition;
}

public boolean shouldActivateJobs() {
return remainingAmount > 0 && (pollPrevPartition || hasNextPartition());
}
}
Loading

0 comments on commit 4a9e051

Please sign in to comment.