Skip to content

Commit

Permalink
fix: don't block actor thread to submit activate jobs handler
Browse files Browse the repository at this point in the history
(cherry picked from commit 403942f)
  • Loading branch information
romansmirnov authored and lenaschoenburg committed Mar 28, 2022
1 parent 6a7fc0c commit 239b7f0
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,20 @@ void startupInternal(
clusterServices.getMembershipService(),
clusterServices.getEventService());

brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var embeddedGatewayServiceFuture = embeddedGatewayService.start();
concurrencyControl.runOnCompletion(
embeddedGatewayServiceFuture,
(gateway, error) -> {
if (error != null) {
startupFuture.completeExceptionally(error);
return;
}

final var springBridge = brokerStartupContext.getSpringBrokerBridge();
final var gateway = embeddedGatewayService.get();
springBridge.registerBrokerClient(gateway::getBrokerClient);

startupFuture.complete(brokerStartupContext);
brokerStartupContext.setEmbeddedGatewayService(embeddedGatewayService);
final var springBridge = brokerStartupContext.getSpringBrokerBridge();
springBridge.registerBrokerClient(gateway::getBrokerClient);
startupFuture.complete(brokerStartupContext);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import io.camunda.zeebe.gateway.impl.broker.BrokerClientImpl;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.util.sched.ActorScheduler;
import java.io.IOException;
import java.io.UncheckedIOException;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import java.util.function.Function;

public final class EmbeddedGatewayService implements AutoCloseable {
Expand All @@ -34,7 +33,6 @@ public EmbeddedGatewayService(
new BrokerClientImpl(
cfg, messagingService, membershipService, eventService, actorScheduler, false);
gateway = new Gateway(configuration.getGateway(), brokerClientFactory, actorScheduler);
startGateway();
}

@Override
Expand All @@ -48,11 +46,7 @@ public Gateway get() {
return gateway;
}

private void startGateway() {
try {
gateway.start();
} catch (final IOException e) {
throw new UncheckedIOException("Gateway was not able to start", e);
}
public ActorFuture<Gateway> start() {
return gateway.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.camunda.zeebe.broker.system.monitoring.BrokerHealthCheckService;
import io.camunda.zeebe.protocol.impl.encoding.BrokerInfo;
import io.camunda.zeebe.test.util.socket.SocketUtil;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.TestConcurrencyControl;
import io.camunda.zeebe.util.sched.future.ActorFuture;
Expand Down Expand Up @@ -58,6 +59,7 @@ class StartupBehavior {

private ActorFuture<BrokerStartupContext> startupFuture;
private ActorScheduler actorScheduler;
private Actor actor;

@BeforeEach
void setUp() {
Expand All @@ -81,6 +83,9 @@ void setUp() {
final var port = SocketUtil.getNextAddress().getPort();
final var commandApiCfg = TEST_BROKER_CONFIG.getGateway().getNetwork();
commandApiCfg.setPort(port);

actor = Actor.newActor().build();
actorScheduler.submitActor(actor);
}

@AfterEach
Expand All @@ -95,7 +100,10 @@ void tearDown() {
@Test
void shouldCompleteFuture() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});

// then
assertThat(startupFuture).succeedsWithin(TIME_OUT);
Expand All @@ -105,7 +113,10 @@ void shouldCompleteFuture() {
@Test
void shouldStartAndInstallEmbeddedGatewayService() {
// when
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
actor.run(
() -> {
sut.startupInternal(testBrokerStartupContext, CONCURRENCY_CONTROL, startupFuture);
});
await().until(startupFuture::isDone);

// then
Expand Down
81 changes: 61 additions & 20 deletions gateway/src/main/java/io/camunda/zeebe/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@
import io.camunda.zeebe.gateway.interceptors.impl.DecoratedInterceptor;
import io.camunda.zeebe.gateway.interceptors.impl.InterceptorRepository;
import io.camunda.zeebe.gateway.query.impl.QueryApiImpl;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.ActorSchedulingService;
import io.camunda.zeebe.util.sched.future.ActorFuture;
import io.camunda.zeebe.util.sched.future.CompletableActorFuture;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import me.dinowernli.grpc.prometheus.Configuration;
Expand Down Expand Up @@ -110,31 +110,65 @@ public BrokerClient getBrokerClient() {
return brokerClient;
}

public void start() throws IOException {
public ActorFuture<Gateway> start() {
final var resultFuture = new CompletableActorFuture<Gateway>();

healthManager.setStatus(Status.STARTING);
brokerClient = buildBrokerClient();

final var activateJobsHandler = buildActivateJobsHandler(brokerClient);
submitActorToActivateJobs((Consumer<ActorControl>) activateJobsHandler);
createAndStartActivateJobsHandler(brokerClient)
.whenComplete(
(activateJobsHandler, error) -> {
if (error != null) {
resultFuture.completeExceptionally(error);
return;
}

final var serverResult = createAndStartServer(activateJobsHandler);
if (serverResult.isLeft()) {
final var exception = serverResult.getLeft();
resultFuture.completeExceptionally(exception);
} else {
server = serverResult.get();
healthManager.setStatus(Status.RUNNING);
resultFuture.complete(this);
}
});

return resultFuture;
}

private Either<Exception, Server> createAndStartServer(
final ActivateJobsHandler activateJobsHandler) {
final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
final ServerBuilder<?> serverBuilder = serverBuilderFactory.apply(gatewayCfg);

try {
final var serverBuilder = serverBuilderFactory.apply(gatewayCfg);
applySecurityConfigurationIfEnabled(serverBuilder);
final var server = buildServer(serverBuilder, gatewayGrpcService);
server.start();
return Either.right(server);
} catch (Exception e) {
return Either.left(e);
}
}

private void applySecurityConfigurationIfEnabled(final ServerBuilder<?> serverBuilder) {
final SecurityCfg securityCfg = gatewayCfg.getSecurity();
if (securityCfg.isEnabled()) {
setSecurityConfig(serverBuilder, securityCfg);
}
}

server =
serverBuilder
.addService(applyInterceptors(gatewayGrpcService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
server.start();
healthManager.setStatus(Status.RUNNING);
private Server buildServer(
final ServerBuilder<?> serverBuilder, final BindableService interceptorService) {
return serverBuilder
.addService(applyInterceptors(interceptorService))
.addService(
ServerInterceptors.intercept(
healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
.build();
}

private static NettyServerBuilder setNetworkConfig(final NetworkCfg cfg) {
Expand Down Expand Up @@ -186,15 +220,22 @@ private BrokerClient buildBrokerClient() {
return brokerClientFactory.apply(gatewayCfg);
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
private CompletableFuture<ActivateJobsHandler> createAndStartActivateJobsHandler(
final BrokerClient brokerClient) {
final var handler = buildActivateJobsHandler(brokerClient);
return submitActorToActivateJobs(handler);
}

private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(
final ActivateJobsHandler handler) {
final var future = new CompletableFuture<ActivateJobsHandler>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(t -> future.complete(handler)))
.build();
actorSchedulingService.submitActor(actor);
actorStartedFuture.join();
return future;
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import io.camunda.zeebe.gateway.grpc.ServerStreamObserver;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsRequest;
import io.camunda.zeebe.gateway.protocol.GatewayOuterClass.ActivateJobsResponse;
import io.camunda.zeebe.util.sched.ActorControl;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/** Can handle an 'activate jobs' request from a client. */
public interface ActivateJobsHandler {
public interface ActivateJobsHandler extends Consumer<ActorControl> {

static final AtomicLong ACTIVATE_JOBS_REQUEST_ID_GENERATOR = new AtomicLong(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.slf4j.Logger;

/**
* Adds long polling to the handling of activate job requests. When there are no jobs available to
* activate, the response will be kept open.
*/
public final class LongPollingActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class LongPollingActivateJobsHandler implements ActivateJobsHandler {

private static final String JOBS_AVAILABLE_TOPIC = "jobsAvailable";
private static final Logger LOG = Loggers.GATEWAY_LOGGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
* Iterates in round-robin fashion over partitions to activate jobs. Uses a map from job type to
* partition-IDs to determine the next partition to use.
*/
public final class RoundRobinActivateJobsHandler
implements ActivateJobsHandler, Consumer<ActorControl> {
public final class RoundRobinActivateJobsHandler implements ActivateJobsHandler {

private static final String ACTIVATE_JOB_NOT_SENT_MSG = "Failed to send activated jobs to client";
private static final String ACTIVATE_JOB_NOT_SENT_MSG_WITH_REASON =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ static void setUp() throws IOException {
cluster.getMembershipService(),
cluster.getEventService(),
actorScheduler);
gateway.start();
gateway.start().join();

final String gatewayAddress = NetUtil.toSocketAddressString(networkCfg.toSocketAddress());
client = ZeebeClient.newClientBuilder().gatewayAddress(gatewayAddress).usePlaintext().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.camunda.zeebe.protocol.record.intent.Intent;
import io.camunda.zeebe.util.Either;
import io.camunda.zeebe.util.sched.Actor;
import io.camunda.zeebe.util.sched.ActorControl;
import io.camunda.zeebe.util.sched.clock.ControlledActorClock;
import io.camunda.zeebe.util.sched.testing.ActorSchedulerRule;
import io.grpc.Status.Code;
Expand Down Expand Up @@ -991,13 +990,13 @@ public BrokerResponse<?> handle(final BrokerActivateJobsRequest request)
}

private void submitActorToActivateJobs(final LongPollingActivateJobsHandler handler) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("LongPollingHandler-Test")
.actorStartedHandler(handler.andThen(actorStartedFuture::complete))
.actorStartedHandler(handler.andThen(future::complete))
.build();
actorSchedulerRule.submitActor(actor);
actorStartedFuture.join();
future.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ public GatewayBlockingStub buildClient() {
}

private void submitActorToActivateJobs(final Consumer<ActorControl> consumer) {
final var actorStartedFuture = new CompletableFuture<ActorControl>();
final var future = new CompletableFuture<>();
final var actor =
Actor.newActor()
.name("ActivateJobsHandler")
.actorStartedHandler(consumer.andThen(actorStartedFuture::complete))
.actorStartedHandler(consumer.andThen(future::complete))
.build();
actorScheduler.submitActor(actor);
actorStartedFuture.join();
future.join();
}

private ActivateJobsHandler buildActivateJobsHandler(final BrokerClient brokerClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void shouldAbortDeploymentCalls() throws IOException {
config.getInterceptors().add(interceptorCfg);

// when
gateway.start();
gateway.start().join();
try (final var client = createZeebeClient()) {
final Future<DeploymentEvent> result =
client
Expand All @@ -110,7 +110,7 @@ void shouldInjectQueryApiViaContext() throws IOException {
config.getInterceptors().add(interceptorCfg);

// when
gateway.start();
gateway.start().join();
try (final var client = createZeebeClient()) {
try {
client.newTopologyRequest().send().join();
Expand Down
Loading

0 comments on commit 239b7f0

Please sign in to comment.