gRPC 提供了 grpc.reflection.v1alpha.ServerReflection
服务,在 Server 端添加后可以通过该服务获取所有服务的信息,包括服务定义,方法,属性等;
可以根据获取到的服务信息调用其他的方法,实现泛化调用;gRPC 调试工具 grpcurl 和 gRPC Swagger 等工具都是通过这种方式实现的
参考 GRPC Server Reflection Protocol 和 reflection.proto
该服务只有一个双向流的方法 ServerReflectionInfo
,调用时根据请求参数不同,调用不同的方法进行处理,并返回响应;该方法的流控是非自动的,只有当一个请求完成之后才会获取下一个请求
service ServerReflection {
rpc ServerReflectionInfo(stream ServerReflectionRequest) returns (stream ServerReflectionResponse);
}
message ServerReflectionRequest {
string host = 1;
oneof message_request {
// 根据服务名查询 proto 文件
string file_by_filename = 3;
// 根据名称获取 proto 文件,如 <package>.<service>[.<method>] 或 <package>.<type>
string file_containing_symbol = 4;
// 根据 message 类型和序号获取 proto 文件
ExtensionRequest file_containing_extension = 5;
// 查找给定消息类型的所有已知扩展使用的标记号,并将它们以未定义的顺序附加到ExtensionNumberResponse
string all_extension_numbers_of_type = 6;
// 查询所有的服务
string list_services = 7;
}
}
- 服务实现
@Slf4j
public class ReflectionServer {
@SneakyThrows
public static void main(String[] args) {
// 构建 Server
Server server = NettyServerBuilder.forAddress(new InetSocketAddress(9090))
// 添加服务
.addService(new HelloServiceImpl())
// 添加反射服务
+ .addService(ProtoReflectionService.newInstance())
.build();
// 启动 Server
server.start();
log.info("服务端启动成功");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
server.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
// 保持运行
server.awaitTermination();
}
}
发起双向流请求
@Slf4j
public class ReflectionClient {
public static void main(String[] args) throws InterruptedException {
// 构建 Channel
ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 9090)
.usePlaintext()
.build();
// 使用 Channel 构建 BlockingStub
ServerReflectionGrpc.ServerReflectionStub reflectionStub = ServerReflectionGrpc.newStub(channel);
StreamObserver<ServerReflectionResponse> streamObserver = new StreamObserver<ServerReflectionResponse>() {
@Override
public void onNext(ServerReflectionResponse response) {
log.info("{}", response);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
log.info("Complete");
}
};
StreamObserver<ServerReflectionRequest> requestStreamObserver = reflectionStub.serverReflectionInfo(streamObserver);
// 列举所有的服务
ServerReflectionRequest listServiceRequest = ServerReflectionRequest.newBuilder()
.setListServices("")
.build();
requestStreamObserver.onNext(listServiceRequest);
}
}
其他的方法使用请参考 ReflectionClient
在 Server 端启动时,将反射服务添加到服务中,当客户端触发调用后,会执行 io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex
方法,会从 Server
中获取所有的可变和不可变的服务,遍历获取所有的服务、方法、属性,添加到 ServerReflectionIndex
对象中
- io.grpc.protobuf.services.ProtoReflectionService.getRefreshedIndex
private ServerReflectionIndex getRefreshedIndex() {
synchronized (lock) {
Server server = InternalServer.SERVER_CONTEXT_KEY.get();
ServerReflectionIndex index = serverReflectionIndexes.get(server);
if (index == null) {
index = new ServerReflectionIndex(server.getImmutableServices(), server.getMutableServices());
serverReflectionIndexes.put(server, index);
return index;
}
// 更新可变服务信息 ...
return index;
}
}
然后处理请求,会调用 io.grpc.protobuf.services.ProtoReflectionService.ProtoReflectionStreamObserver.handleReflectionRequest
方法,根据请求参数进行判断,使用不同的方法处理,并返回响应
private void handleReflectionRequest() {
if (serverCallStreamObserver.isReady()) {
switch (request.getMessageRequestCase()) {
case FILE_BY_FILENAME:
getFileByName(request);
break;
case FILE_CONTAINING_SYMBOL:
getFileContainingSymbol(request);
break;
case FILE_CONTAINING_EXTENSION:
getFileByExtension(request);
break;
case ALL_EXTENSION_NUMBERS_OF_TYPE:
getAllExtensions(request);
break;
case LIST_SERVICES:
listServices(request);
break;
default:
sendErrorResponse(request, Status.Code.UNIMPLEMENTED, "not implemented " + request.getMessageRequestCase());
}
request = null;
// 如果在发送完成后关闭,则关闭流,否则要求下一个请求
if (closeAfterSend) {
serverCallStreamObserver.onCompleted();
} else {
serverCallStreamObserver.request(1);
}
}
}