Skip to content

Commit

Permalink
Unix domain socket support for the HttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
shs96c committed Mar 12, 2020
1 parent 70e74bb commit 9f9f0e5
Show file tree
Hide file tree
Showing 8 changed files with 1,879 additions and 1,254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,14 @@ java_library(
artifact("com.google.guava:guava"),
artifact("com.typesafe.netty:netty-reactive-streams"),
artifact("org.asynchttpclient:async-http-client"),
artifact("io.netty:netty-buffer"),
artifact("io.netty:netty-codec-http"),
artifact("io.netty:netty-transport"),
artifact("io.netty:netty-transport-native-epoll"),
artifact("io.netty:netty-transport-native-epoll-linux-x86_64"),
artifact("io.netty:netty-transport-native-kqueue"),
artifact("io.netty:netty-transport-native-kqueue-osx-x86_64"),
artifact("io.netty:netty-transport-native-unix-common"),

],
)
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public static class Factory implements HttpClient.Factory {
public HttpClient createClient(ClientConfig config) {
Objects.requireNonNull(config, "Client config to use must be set.");

if ("unix".equals(config.baseUri().getScheme())) {
return new NettyDomainSocketClient(config);
}

return new NettyClient(new NettyHttpHandler(config, httpClient).with(config.filter()), NettyWebSocket.create(config, httpClient));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.remote.http.netty;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.io.ByteStreams;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollDomainSocketChannel;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueDomainSocketChannel;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.UnixChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
import org.openqa.selenium.remote.http.ClientConfig;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.http.RemoteCall;
import org.openqa.selenium.remote.http.WebSocket;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.net.URI;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

class NettyDomainSocketClient extends RemoteCall implements HttpClient {

private final EventLoopGroup eventLoopGroup;
private final Class<? extends Channel> channelClazz;
private final String path;
private final HttpHandler handler;

public NettyDomainSocketClient(ClientConfig config) {
super(config);
URI uri = config.baseUri();
Preconditions.checkArgument("unix".equals(uri.getScheme()), "URI scheme must be `unix`");

if (Epoll.isAvailable()) {
this.eventLoopGroup = new EpollEventLoopGroup();
this.channelClazz = EpollDomainSocketChannel.class;
} else if (KQueue.isAvailable()) {
this.eventLoopGroup = new KQueueEventLoopGroup();
this.channelClazz = KQueueDomainSocketChannel.class;
} else {
throw new IllegalStateException("No native library for unix domain sockets is available");
}

this.path = uri.getPath();
this.handler = config.filter().andFinally(this);
}

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
Objects.requireNonNull(req, "Request to send must be set.");

AtomicReference<HttpResponse> outRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Channel channel = createChannel(outRef, latch);

StringBuilder uri = new StringBuilder(req.getUri());
List<String> queryPairs = new ArrayList<>();
req.getQueryParameterNames().forEach(
name -> req.getQueryParameters(name).forEach(
value -> queryPairs.add(URLEncoder.encode(name, UTF_8) + "=" + URLEncoder.encode(value, UTF_8))));
if (!queryPairs.isEmpty()) {
uri.append("?");
Joiner.on('&').appendTo(uri, queryPairs);
}

DefaultFullHttpRequest fullRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1,
HttpMethod.valueOf(req.getMethod().toString()),
uri.toString());
req.getHeaderNames().forEach(name -> req.getHeaders(name).forEach(value -> fullRequest.headers().add(name, value)));
fullRequest.headers().set(HttpHeaderNames.HOST, "localhost");
fullRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

ChannelFuture future = channel.writeAndFlush(fullRequest);

try {
future.get();
channel.closeFuture().sync();
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
throw new UncheckedIOException(new IOException(e));
}

try {
latch.await(getConfig().readTimeout().toMillis(), MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}

return outRef.get();
}

@Override
public WebSocket openSocket(HttpRequest request, WebSocket.Listener listener) {
throw new UnsupportedOperationException("openSocket");
}

private Channel createChannel(AtomicReference<HttpResponse> outRef, CountDownLatch latch) {
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(channelClazz)
.handler(new ChannelInitializer<UnixChannel>() {
@Override
public void initChannel(UnixChannel ch) {
ch
.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpContentDecompressor())
.addLast(new HttpObjectAggregator(Integer.MAX_VALUE))
.addLast(new SimpleChannelInboundHandler<FullHttpResponse>() {
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
HttpResponse res = new HttpResponse().setStatus(msg.status().code());
msg.headers().forEach(entry -> res.addHeader(entry.getKey(), entry.getValue()));

try (InputStream is = new ByteBufInputStream(msg.content());
ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
ByteStreams.copy(is, bos);
res.setContent(Contents.bytes(bos.toByteArray()));
outRef.set(res);
latch.countDown();
} catch (IOException e) {
outRef.set(new HttpResponse()
.setStatus(HTTP_INTERNAL_ERROR)
.setContent(Contents.string(Throwables.getStackTraceAsString(e), UTF_8)));
latch.countDown();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
outRef.set(new HttpResponse()
.setStatus(HTTP_INTERNAL_ERROR)
.setContent(Contents.string(Throwables.getStackTraceAsString(cause), UTF_8)));
latch.countDown();
}
});
}
});
try {
return bootstrap.connect(new DomainSocketAddress(path)).sync().channel();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
26 changes: 26 additions & 0 deletions java/client/test/org/openqa/selenium/remote/http/netty/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("@rules_jvm_external//:defs.bzl", "artifact")
load("//java:defs.bzl", "java_test_suite")

java_test_suite(
name = "medium-tests",
size = "medium",
srcs = glob(["*.java"]),
deps = [
"//java/client/src/org/openqa/selenium/remote/http",
"//java/client/src/org/openqa/selenium/remote/http/netty",
"//java/client/test/org/openqa/selenium/testing:test-base",
artifact("com.google.guava:guava"),
artifact("junit:junit"),
artifact("org.assertj:assertj-core"),
artifact("io.netty:netty-buffer"),
artifact("io.netty:netty-codec-http"),
artifact("io.netty:netty-handler"),
artifact("io.netty:netty-transport"),
artifact("io.netty:netty-transport-native-epoll"),
artifact("io.netty:netty-transport-native-epoll-linux-x86_64"),
artifact("io.netty:netty-transport-native-kqueue"),
artifact("io.netty:netty-transport-native-kqueue-osx-x86_64"),
artifact("io.netty:netty-transport-native-unix-common"),
artifact("org.slf4j:slf4j-jdk14"),
]
)
Loading

0 comments on commit 9f9f0e5

Please sign in to comment.