Skip to content

Commit

Permalink
RATIS-2168. Support custom gRPC services. (#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo authored Oct 17, 2024
1 parent a15bde1 commit e96ed1a
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 39 deletions.
4 changes: 4 additions & 0 deletions ratis-grpc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.ratis</groupId>
<artifactId>ratis-server-api</artifactId>
</dependency>
<dependency>
<artifactId>ratis-server</artifactId>
<groupId>org.apache.ratis</groupId>
Expand Down
29 changes: 20 additions & 9 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -230,15 +231,6 @@ static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) {
setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
}

String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
}
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}

String LEADER_OUTSTANDING_APPENDS_MAX_KEY = PREFIX + ".leader.outstanding.appends.max";
int LEADER_OUTSTANDING_APPENDS_MAX_DEFAULT = 8;
static int leaderOutstandingAppendsMax(RaftProperties properties) {
Expand Down Expand Up @@ -301,6 +293,25 @@ static boolean zeroCopyEnabled(RaftProperties properties) {
static void setZeroCopyEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, ZERO_COPY_ENABLED_KEY, enabled);
}

String SERVICES_CUSTOMIZER_PARAMETER = PREFIX + ".services.customizer";
Class<GrpcServices.Customizer> SERVICES_CUSTOMIZER_CLASS = GrpcServices.Customizer.class;
static GrpcServices.Customizer servicesCustomizer(Parameters parameters) {
return parameters == null ? null
: parameters.get(SERVICES_CUSTOMIZER_PARAMETER, SERVICES_CUSTOMIZER_CLASS);
}
static void setServicesCustomizer(Parameters parameters, GrpcServices.Customizer customizer) {
parameters.put(SERVICES_CUSTOMIZER_PARAMETER, customizer, SERVICES_CUSTOMIZER_CLASS);
}

String TLS_CONF_PARAMETER = PREFIX + ".tls.conf";
Class<GrpcTlsConfig> TLS_CONF_CLASS = TLS.CONF_CLASS;
static GrpcTlsConfig tlsConf(Parameters parameters) {
return parameters != null ? parameters.get(TLS_CONF_PARAMETER, TLS_CONF_CLASS): null;
}
static void setTlsConf(Parameters parameters, GrpcTlsConfig conf) {
parameters.put(TLS_CONF_PARAMETER, conf, TLS_CONF_CLASS);
}
}

String MESSAGE_SIZE_MAX_KEY = PREFIX + ".message.size.max";
Expand Down
19 changes: 13 additions & 6 deletions ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientRpc;
import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.grpc.server.GrpcServices;
import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
Expand Down Expand Up @@ -64,6 +65,8 @@ static boolean checkPooledByteBufAllocatorUseCacheForAllThreads(Consumer<String>
return value;
}

private final GrpcServices.Customizer servicesCustomizer;

private final GrpcTlsConfig tlsConfig;
private final GrpcTlsConfig adminTlsConfig;
private final GrpcTlsConfig clientTlsConfig;
Expand All @@ -76,7 +79,7 @@ public static Parameters newRaftParameters(GrpcTlsConfig conf) {
}

public GrpcFactory(Parameters parameters) {
this(
this(GrpcConfigKeys.Server.servicesCustomizer(parameters),
GrpcConfigKeys.TLS.conf(parameters),
GrpcConfigKeys.Admin.tlsConf(parameters),
GrpcConfigKeys.Client.tlsConf(parameters),
Expand All @@ -85,11 +88,14 @@ public GrpcFactory(Parameters parameters) {
}

public GrpcFactory(GrpcTlsConfig tlsConfig) {
this(tlsConfig, null, null, null);
this(null, tlsConfig, null, null, null);
}

private GrpcFactory(GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
private GrpcFactory(GrpcServices.Customizer servicesCustomizer,
GrpcTlsConfig tlsConfig, GrpcTlsConfig adminTlsConfig,
GrpcTlsConfig clientTlsConfig, GrpcTlsConfig serverTlsConfig) {
this.servicesCustomizer = servicesCustomizer;

this.tlsConfig = tlsConfig;
this.adminTlsConfig = adminTlsConfig;
this.clientTlsConfig = clientTlsConfig;
Expand Down Expand Up @@ -123,10 +129,11 @@ public LogAppender newLogAppender(RaftServer.Division server, LeaderState state,
}

@Override
public GrpcService newRaftServerRpc(RaftServer server) {
public GrpcServices newRaftServerRpc(RaftServer server) {
checkPooledByteBufAllocatorUseCacheForAllThreads(LOG::info);
return GrpcService.newBuilder()
return GrpcServicesImpl.newBuilder()
.setServer(server)
.setCustomizer(servicesCustomizer)
.setAdminTlsConfig(getAdminTlsConfig())
.setServerTlsConfig(getServerTlsConfig())
.setClientTlsConfig(getClientTlsConfig())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, Foll
}

@Override
public GrpcService getServerRpc() {
return (GrpcService)super.getServerRpc();
public GrpcServicesImpl getServerRpc() {
return (GrpcServicesImpl)super.getServerRpc();
}

private GrpcServerProtocolClient getClient() throws IOException {
Expand Down Expand Up @@ -428,7 +428,7 @@ private static void sleep(TimeDuration waitTime, boolean heartbeat)

private void sendRequest(AppendEntriesRequest request,
AppendEntriesRequestProto proto) throws InterruptedIOException {
CodeInjectionForTesting.execute(GrpcService.GRPC_SEND_SERVER_REQUEST,
CodeInjectionForTesting.execute(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST,
getServer().getId(), null, proto);
resetHeartbeatTrigger();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.grpc.server;

import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;

import java.util.EnumSet;

/** The gRPC services extending {@link RaftServerRpc}. */
public interface GrpcServices extends RaftServerRpc {
/** The type of the services. */
enum Type {ADMIN, CLIENT, SERVER}

/**
* To customize the services.
* For example, add a custom service.
*/
interface Customizer {
/** The default NOOP {@link Customizer}. */
class Default implements Customizer {
private static final Default INSTANCE = new Default();

@Override
public NettyServerBuilder customize(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types) {
return builder;
}
}

static Customizer getDefaultInstance() {
return Default.INSTANCE;
}

/**
* Customize the given builder for the given types.
*
* @return the customized builder.
*/
NettyServerBuilder customize(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcTlsConfig;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.MessageMetrics;
import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.metrics.intercept.server.MetricServerInterceptor;
import org.apache.ratis.protocol.AdminAsynchronousProtocol;
Expand Down Expand Up @@ -51,6 +52,7 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -60,11 +62,12 @@
import static org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider.OPENSSL;

/** A grpc implementation of {@link org.apache.ratis.server.RaftServerRpc}. */
public final class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient,
PeerProxyMap<GrpcServerProtocolClient>> {
static final Logger LOG = LoggerFactory.getLogger(GrpcService.class);
public final class GrpcServicesImpl
extends RaftServerRpcWithProxy<GrpcServerProtocolClient, PeerProxyMap<GrpcServerProtocolClient>>
implements GrpcServices {
static final Logger LOG = LoggerFactory.getLogger(GrpcServicesImpl.class);
public static final String GRPC_SEND_SERVER_REQUEST =
JavaUtils.getClassSimpleName(GrpcService.class) + ".sendRequest";
JavaUtils.getClassSimpleName(GrpcServicesImpl.class) + ".sendRequest";

class AsyncService implements RaftServerAsynchronousProtocol {

Expand Down Expand Up @@ -102,6 +105,7 @@ public void onCompleted() {

public static final class Builder {
private RaftServer server;
private Customizer customizer;

private String adminHost;
private int adminPort;
Expand Down Expand Up @@ -150,6 +154,11 @@ public Builder setServer(RaftServer raftServer) {
return this;
}

public Builder setCustomizer(Customizer customizer) {
this.customizer = customizer != null? customizer : Customizer.getDefaultInstance();
return this;
}

private GrpcServerProtocolClient newGrpcServerProtocolClient(RaftPeer target) {
return new GrpcServerProtocolClient(target, flowControlWindow.getSizeInt(),
requestTimeoutDuration, serverTlsConfig, separateHeartbeatChannel);
Expand Down Expand Up @@ -177,6 +186,10 @@ private MetricServerInterceptor newMetricServerInterceptor() {
JavaUtils.getClassSimpleName(getClass()) + "_" + serverPort);
}

Server buildServer(NettyServerBuilder builder, EnumSet<GrpcServices.Type> types) {
return customizer.customize(builder, types).build();
}

private NettyServerBuilder newNettyServerBuilderForServer() {
return newNettyServerBuilder(serverHost, serverPort, serverTlsConfig);
}
Expand Down Expand Up @@ -223,21 +236,24 @@ private boolean separateClientServer() {
}

Server newServer(GrpcClientProtocolService client, ZeroCopyMetrics zeroCopyMetrics, ServerInterceptor interceptor) {
final EnumSet<GrpcServices.Type> types = EnumSet.of(GrpcServices.Type.SERVER);
final NettyServerBuilder serverBuilder = newNettyServerBuilderForServer();
final ServerServiceDefinition service = newGrpcServerProtocolService(zeroCopyMetrics).bindServiceWithZeroCopy();
serverBuilder.addService(ServerInterceptors.intercept(service, interceptor));

if (!separateAdminServer()) {
types.add(GrpcServices.Type.ADMIN);
addAdminService(serverBuilder, server, interceptor);
}
if (!separateClientServer()) {
types.add(GrpcServices.Type.CLIENT);
addClientService(serverBuilder, client, interceptor);
}
return serverBuilder.build();
return buildServer(serverBuilder, types);
}

public GrpcService build() {
return new GrpcService(this);
public GrpcServicesImpl build() {
return new GrpcServicesImpl(this);
}

public Builder setAdminTlsConfig(GrpcTlsConfig config) {
Expand Down Expand Up @@ -273,11 +289,7 @@ public static Builder newBuilder() {
private final MetricServerInterceptor serverInterceptor;
private final ZeroCopyMetrics zeroCopyMetrics = new ZeroCopyMetrics();

public MetricServerInterceptor getServerInterceptor() {
return serverInterceptor;
}

private GrpcService(Builder b) {
private GrpcServicesImpl(Builder b) {
super(b.server::getId, id -> new PeerProxyMap<>(id.toString(), b::newGrpcServerProtocolClient));

this.executor = b.newExecutor();
Expand All @@ -291,7 +303,7 @@ private GrpcService(Builder b) {
if (b.separateAdminServer()) {
final NettyServerBuilder builder = b.newNettyServerBuilderForAdmin();
addAdminService(builder, b.server, serverInterceptor);
final Server adminServer = builder.build();
final Server adminServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.ADMIN));
servers.put(GrpcAdminProtocolService.class.getName(), adminServer);
adminServerAddressSupplier = newAddressSupplier(b.adminPort, adminServer);
} else {
Expand All @@ -301,7 +313,7 @@ private GrpcService(Builder b) {
if (b.separateClientServer()) {
final NettyServerBuilder builder = b.newNettyServerBuilderForClient();
addClientService(builder, clientProtocolService, serverInterceptor);
final Server clientServer = builder.build();
final Server clientServer = b.buildServer(builder, EnumSet.of(GrpcServices.Type.CLIENT));
servers.put(GrpcClientProtocolService.class.getName(), clientServer);
clientServerAddressSupplier = newAddressSupplier(b.clientPort, clientServer);
} else {
Expand Down Expand Up @@ -419,6 +431,11 @@ public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequ
return getProxies().getProxy(target).startLeaderElection(request);
}

@VisibleForTesting
MessageMetrics getMessageMetrics() {
return serverInterceptor.getMetrics();
}

@VisibleForTesting
public ZeroCopyMetrics getZeroCopyMetrics() {
return zeroCopyMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.metrics.ZeroCopyMetrics;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.grpc.server.GrpcServicesImpl;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
Expand Down Expand Up @@ -63,7 +63,7 @@ default Factory<MiniRaftClusterWithGrpc> getFactory() {
}

public static final DelayLocalExecutionInjection SEND_SERVER_REQUEST_INJECTION =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
new DelayLocalExecutionInjection(GrpcServicesImpl.GRPC_SEND_SERVER_REQUEST);

public MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties, Parameters parameters) {
this(ids, new String[0], properties, parameters);
Expand Down Expand Up @@ -102,7 +102,7 @@ public void assertZeroCopyMetrics() {
getServers().forEach(server -> server.getGroupIds().forEach(id -> {
LOG.info("Checking {}-{}", server.getId(), id);
RaftServer.Division division = RaftServerTestUtil.getDivision(server, id);
GrpcService service = (GrpcService) RaftServerTestUtil.getServerRpc(division);
final GrpcServicesImpl service = (GrpcServicesImpl) RaftServerTestUtil.getServerRpc(division);
ZeroCopyMetrics zeroCopyMetrics = service.getZeroCopyMetrics();
Assert.assertEquals(0, zeroCopyMetrics.nonZeroCopyMessages());
Assert.assertEquals("Zero copy messages are not released, please check logs to find leaks. ",
Expand Down
Loading

0 comments on commit e96ed1a

Please sign in to comment.