Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Triple protocol http1 upgrade support #14026

Merged
merged 4 commits into from
May 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dubbo-remoting/dubbo-remoting-netty4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@
<classifier>linux-aarch_64</classifier>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</dependency>

<dependency>
<groupId>org.apache.dubbo</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
Expand Down Expand Up @@ -119,48 +121,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
if (providerConnectionConfig != null && isSsl(in)) {
enableSsl(ctx, providerConnectionConfig);
} else {
Set<String> supportedProtocolNames = new HashSet<>(protocols.keySet());
supportedProtocolNames.retainAll(urlMapper.keySet());

for (final String name : supportedProtocolNames) {
WireProtocol protocol = protocols.get(name);
in.markReaderIndex();
ChannelBuffer buf = new NettyBackedChannelBuffer(in);
final ProtocolDetector.Result result = protocol.detector().detect(buf);
in.resetReaderIndex();
switch (result.flag()) {
case UNRECOGNIZED:
continue;
case RECOGNIZED:
ChannelHandler localHandler = this.handlerMapper.getOrDefault(name, handler);
URL localURL = this.urlMapper.getOrDefault(name, url);
channel.setUrl(localURL);
NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
operator.setDetectResult(result);
protocol.configServerProtocolHandler(url, operator);
ctx.pipeline().remove(this);
case NEED_MORE_DATA:
return;
default:
return;
}
}
byte[] preface = new byte[in.readableBytes()];
in.readBytes(preface);
Set<String> supported = url.getApplicationModel()
.getExtensionLoader(WireProtocol.class)
.getSupportedExtensions();
LOGGER.error(
INTERNAL_ERROR,
"unknown error in remoting module",
"",
String.format(
"Can not recognize protocol from downstream=%s . " + "preface=%s protocols=%s",
ctx.channel().remoteAddress(), Bytes.bytes2hex(preface), supported));

// Unknown protocol; discard everything and close the connection.
in.clear();
ctx.close();
detectProtocol(ctx, url, channel, in);
}
}

Expand All @@ -171,6 +132,17 @@ private void enableSsl(ChannelHandlerContext ctx, ProviderCert providerConnectio
p.addLast(
"unificationA",
new NettyPortUnificationServerHandler(url, false, protocols, handler, urlMapper, handlerMapper));
p.addLast("ALPN", new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) {
return;
}
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
ByteBuf in = ctx.alloc().buffer();
detectProtocol(ctx, url, channel, in);
}
});
p.remove(this);
}

Expand All @@ -181,4 +153,48 @@ private boolean isSsl(ByteBuf buf) {
}
return false;
}

private void detectProtocol(ChannelHandlerContext ctx, URL url, NettyChannel channel, ByteBuf in) {
Set<String> supportedProtocolNames = new HashSet<>(protocols.keySet());
supportedProtocolNames.retainAll(urlMapper.keySet());

for (final String name : supportedProtocolNames) {
WireProtocol protocol = protocols.get(name);
in.markReaderIndex();
ChannelBuffer buf = new NettyBackedChannelBuffer(in);
final ProtocolDetector.Result result = protocol.detector().detect(buf);
in.resetReaderIndex();
switch (result.flag()) {
case UNRECOGNIZED:
continue;
case RECOGNIZED:
ChannelHandler localHandler = this.handlerMapper.getOrDefault(name, handler);
URL localURL = this.urlMapper.getOrDefault(name, url);
channel.setUrl(localURL);
NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
operator.setDetectResult(result);
protocol.configServerProtocolHandler(url, operator);
ctx.pipeline().remove(this);
case NEED_MORE_DATA:
return;
default:
return;
}
}
byte[] preface = new byte[in.readableBytes()];
in.readBytes(preface);
Set<String> supported =
walklown marked this conversation as resolved.
Show resolved Hide resolved
url.getApplicationModel().getExtensionLoader(WireProtocol.class).getSupportedExtensions();
LOGGER.error(
INTERNAL_ERROR,
"unknown error in remoting module",
"",
String.format(
"Can not recognize protocol from downstream=%s . " + "preface=%s protocols=%s",
ctx.channel().remoteAddress(), Bytes.bytes2hex(preface), supported));

// Unknown protocol; discard everything and close the connection.
in.clear();
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
import java.security.Provider;
import java.security.Security;

import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;

import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM;

Expand Down Expand Up @@ -77,7 +81,16 @@ public static SslContext buildServerSslContext(ProviderCert providerConnectionCo
safeCloseStream(serverPrivateKeyPathStream);
}
try {
return sslClientContextBuilder.sslProvider(findSslProvider()).build();
return sslClientContextBuilder
.sslProvider(findSslProvider())
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2,
ApplicationProtocolNames.HTTP_1_1))
.build();
} catch (SSLException e) {
throw new IllegalStateException("Build SslSession failed.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec;
import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler;
import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpServerAfterUpgradeHandler;
import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector;
import org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory;
import org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
Expand All @@ -48,14 +50,18 @@
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString;

import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
Expand Down Expand Up @@ -143,31 +149,52 @@ public void configServerProtocolHandler(URL url, ChannelOperator operator) {
}

private void configurerHttp1Handlers(URL url, List<ChannelHandler> handlers) {
handlers.add(new ChannelHandlerPretender(new HttpServerCodec()));
final HttpServerCodec sourceCodec = new HttpServerCodec();
handlers.add(new ChannelHandlerPretender(sourceCodec));
// Triple protocol http1 upgrade support
handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler(
sourceCodec,
protocol -> {
if (AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, protocol)) {
Configuration config =
ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
return new Http2ServerUpgradeCodec(
buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel()),
new HttpServerAfterUpgradeHandler(),
new HttpWriteQueueHandler(),
new FlushConsolidationHandler(64, true),
new TripleServerConnectionHandler(),
buildHttp2MultiplexHandler(url),
new TripleTailHandler());
}
// Not upgrade request
return null;
},
Integer.MAX_VALUE)));
// If the upgrade was successful, remove the message from the output list
// so that it's not propagated to the next handler. This request will
// be propagated as a user event instead.
handlers.add(new ChannelHandlerPretender(new HttpObjectAggregator(Integer.MAX_VALUE)));
handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
handlers.add(new ChannelHandlerPretender(new NettyHttp1ConnectionHandler(
url, frameworkModel, DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
}

private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) {
return new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
@Override
protected void initChannel(Http2StreamChannel ch) {
final ChannelPipeline p = ch.pipeline();
p.addLast(new NettyHttp2FrameCodec());
p.addLast(new NettyHttp2ProtocolSelectorHandler(
url, frameworkModel, GenericHttp2ServerTransportListenerFactory.INSTANCE));
}
});
}

private void configurerHttp2Handlers(URL url, List<ChannelHandler> handlers) {
Configuration config = ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) -> connection
.remote()
.flowController(
new TriHttp2RemoteFlowController(connection, url.getOrDefaultApplicationModel())))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
.headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE))
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE))
.maxHeaderListSize(
config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
final Http2FrameCodec codec = buildHttp2FrameCodec(config, url.getOrDefaultApplicationModel());
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
walklown marked this conversation as resolved.
Show resolved Hide resolved
@Override
protected void initChannel(Http2StreamChannel ch) {
Expand All @@ -184,4 +211,22 @@ protected void initChannel(Http2StreamChannel ch) {
handlers.add(new ChannelHandlerPretender(handler));
handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
}

private Http2FrameCodec buildHttp2FrameCodec(Configuration config, ApplicationModel applicationModel) {
return TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) -> connection
.remote()
.flowController(new TriHttp2RemoteFlowController(connection, applicationModel)))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
.headerTableSize(
config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, DEFAULT_SETTING_HEADER_LIST_SIZE))
.maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
.initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE))
.maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, DEFAULT_MAX_FRAME_SIZE))
.maxHeaderListSize(
config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.protocol.tri.h12;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameStream;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter;

import static io.netty.handler.codec.http.HttpResponseStatus.OK;

/**
* If an upgrade occurred, the program need send a simple response via HTTP/2 on stream 1 (the stream specifically reserved
* for cleartext HTTP upgrade). However, {@link Http2FrameCodec} send 'upgradeRequest' to upgraded channel handlers by
* {@link InboundHttpToHttp2Adapter} (As it noted that this may behave strangely). So we need to distinguish the 'upgradeRequest'
* and send the response.<br/>
*
* @see HttpServerUpgradeHandler
* @see Http2FrameCodec
* @see InboundHttpToHttp2Adapter
* @since 3.3.0
*/
@Sharable
public class HttpServerAfterUpgradeHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof DefaultHttp2HeadersFrame) {
DefaultHttp2HeadersFrame headersFrame = (DefaultHttp2HeadersFrame) msg;
if (headersFrame.stream().id() == Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && headersFrame.isEndStream()) {
// upgradeRequest
sendResponse(ctx, headersFrame.stream());
return;
}
}
super.channelRead(ctx, msg);
}

/**
* Send a frame for the response status
*/
private static void sendResponse(ChannelHandlerContext ctx, Http2FrameStream stream) {
Http2Headers headers = new DefaultHttp2Headers().status(OK.codeAsText());
ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream));
ctx.write(new DefaultHttp2DataFrame(true).stream(stream));
}
}
Loading