Skip to content

Commit

Permalink
refactor: make RoundRobinHandler ready to use an actor
Browse files Browse the repository at this point in the history
* This allows to submit actor jobs when a response is received from the
  broker
* These actor jobs are handled by the actor of the RoundRobinHandler
* The actor is being shared between the LongPollingActivateJobsHandler
  and the RoundRobinActivateJobsHandler to control the access to the
  InflightActivateJobsRequest

(cherry picked from commit 6ff8bc3)
  • Loading branch information
romansmirnov authored and github-actions[bot] committed Mar 22, 2022
1 parent a76bc16 commit 00d3420
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 57 deletions.
42 changes: 22 additions & 20 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
Expand Down Expand Up @@ -113,15 +114,8 @@ public void start() throws IOException {
healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

final ActivateJobsHandler activateJobsHandler;
if (gatewayCfg.getLongPolling().isEnabled()) {
final LongPollingActivateJobsHandler longPollingHandler =
buildLongPollingHandler(brokerClient);
submitLongPollingActor(longPollingHandler);
activateJobsHandler = longPollingHandler;
} else {
activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
}
final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActivateJobsActor((Consumer<ActorControl>) activateJobsHandler);

final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
Expand Down Expand Up @@ -155,17 +149,6 @@ private static NettyServerBuilder setNetworkConfig(final NetworkCfg cfg) {
.permitKeepAliveWithoutCalls(false);
}

private void submitLongPollingActor(final LongPollingActivateJobsHandler handler) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
.name(handler.getName())
.actorStartedHandler(handler.andThen(actorStartedFuture::complete))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
}

private void setSecurityConfig(final ServerBuilder<?> serverBuilder, final SecurityCfg security) {
final var certificateChainPath = security.getCertificateChainPath();
final var privateKeyPath = security.getPrivateKeyPath();
Expand Down Expand Up @@ -203,6 +186,25 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActivateJobsActor(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
if (gatewayCfg.getLongPolling().isEnabled()) {
return buildLongPollingHandler(brokerClient);
} else {
return new RoundRobinActivateJobsHandler(brokerClient);
}
}

private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,10 @@ private LongPollingActivateJobsHandler(
metrics = new LongPollingMetrics();
}

public String getName() {
return "GatewayLongPollingJobHandler";
}

@Override
public void accept(ActorControl actor) {
this.actor = actor;
activateJobsHandler.accept(actor);
onActorStarted();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.protocol.record.ErrorCode;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
Expand All @@ -32,18 +33,26 @@
* Iterates in round-robin fashion over partitions to activate jobs. Uses a map from job type to
* partition-IDs to determine the next partition to use.
*/
public final class RoundRobinActivateJobsHandler implements ActivateJobsHandler {
public final class RoundRobinActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {

private final Map<String, RequestDispatchStrategy> jobTypeToNextPartitionId =
new ConcurrentHashMap<>();
private final BrokerClient brokerClient;
private final BrokerTopologyManager topologyManager;

private ActorControl actor;

public RoundRobinActivateJobsHandler(final BrokerClient brokerClient) {
this.brokerClient = brokerClient;
topologyManager = brokerClient.getTopologyManager();
}

@Override
public void accept(ActorControl actor) {
this.actor = actor;
}

@Override
public void activateJobs(
final ActivateJobsRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

import io.camunda.zeebe.gateway.EndpointManager;
import io.camunda.zeebe.gateway.GatewayGrpcService;
import io.camunda.zeebe.gateway.impl.broker.BrokerClient;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.LongPollingActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.RoundRobinActivateJobsHandler;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayBlockingStub;
import io.camunda.zeebe.util.sched.Actor;
Expand All @@ -22,30 +25,30 @@
import io.grpc.inprocess.InProcessServerBuilder;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

@SuppressWarnings({"rawtypes", "unchecked"})
@SuppressWarnings({"unchecked"})
public final class StubbedGateway {

private static final String SERVER_NAME = "server";

private final StubbedBrokerClient brokerClient;
private final ActivateJobsHandler activateJobsHandler;
private final ActorScheduler actorScheduler;
private final GatewayCfg config;
private Server server;

public StubbedGateway(
final ActorScheduler actorScheduler,
final StubbedBrokerClient brokerClient,
final ActivateJobsHandler activateJobsHandler) {
final GatewayCfg config) {
this.actorScheduler = actorScheduler;
this.brokerClient = brokerClient;
this.activateJobsHandler = activateJobsHandler;
this.config = config;
}

public void start() throws IOException {
if (activateJobsHandler instanceof LongPollingActivateJobsHandler handler) {
submitLongPollingActor(handler);
}
final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActivateJobsActor((Consumer<ActorControl>) activateJobsHandler);

final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
Expand All @@ -56,17 +59,6 @@ public void start() throws IOException {
server.start();
}

private void submitLongPollingActor(final LongPollingActivateJobsHandler handler) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
.name(handler.getName())
.actorStartedHandler(handler.andThen(actorStartedFuture::complete))
.build();
actorScheduler.submitActor(actor);
actorStartedFuture.join();
}

public void stop() {
if (server != null && !server.isShutdown()) {
server.shutdownNow();
Expand All @@ -83,4 +75,27 @@ public GatewayBlockingStub buildClient() {
InProcessChannelBuilder.forName(SERVER_NAME).directExecutor().build();
return GatewayGrpc.newBlockingStub(channel);
}

private void submitActivateJobsActor(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.build();
actorScheduler.submitActor(actor);
actorStartedFuture.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
if (config.getLongPolling().isEnabled()) {
return buildLongPollingHandler(brokerClient);
} else {
return new RoundRobinActivateJobsHandler(brokerClient);
}
}

private LongPollingActivateJobsHandler buildLongPollingHandler(final BrokerClient brokerClient) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
package io.camunda.zeebe.gateway.api.util;

import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.gateway.impl.job.ActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.LongPollingActivateJobsHandler;
import io.camunda.zeebe.gateway.impl.job.RoundRobinActivateJobsHandler;
import io.camunda.zeebe.gateway.protocol.GatewayGrpc.GatewayBlockingStub;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import org.junit.rules.ExternalResource;
Expand All @@ -20,26 +17,18 @@ public final class StubbedGatewayRule extends ExternalResource {
protected StubbedGateway gateway;
protected GatewayBlockingStub client;
private final ActorSchedulerRule actorSchedulerRule;
private final GatewayCfg config;
private final StubbedBrokerClient brokerClient;
private final ActivateJobsHandler activateJobsHandler;

public StubbedGatewayRule(final ActorSchedulerRule actorSchedulerRule, final GatewayCfg config) {
this.actorSchedulerRule = actorSchedulerRule;
brokerClient = new StubbedBrokerClient();
activateJobsHandler = getActivateJobsHandler(config, brokerClient);
}

private static ActivateJobsHandler getActivateJobsHandler(
final GatewayCfg config, final StubbedBrokerClient brokerClient) {
if (config.getLongPolling().isEnabled()) {
return LongPollingActivateJobsHandler.newBuilder().setBrokerClient(brokerClient).build();
}
return new RoundRobinActivateJobsHandler(brokerClient);
this.config = config;
}

@Override
protected void before() throws Throwable {
gateway = new StubbedGateway(actorSchedulerRule.get(), brokerClient, activateJobsHandler);
gateway = new StubbedGateway(actorSchedulerRule.get(), brokerClient, config);
gateway.start();
client = gateway.buildClient();
}
Expand Down

0 comments on commit 00d3420

Please sign in to comment.