From 8a60245e0bb63bdd3330859ca313c5581b1600d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?A=CC=81lvaro=20Sa=CC=81nchez?= Date: Wed, 9 Oct 2019 13:02:53 +0200 Subject: [PATCH] Issue #335 --- .../mqtt/MqttWebSocketConfigImpl.java | 45 +++++---- .../mqtt/MqttWebSocketConfigImplBuilder.java | 96 +++++++++++-------- .../websocket/MqttWebSocketInitializer.java | 67 +++++++------ .../client/mqtt/MqttWebSocketConfig.java | 41 +++++--- 4 files changed, 142 insertions(+), 107 deletions(-) diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImpl.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImpl.java index 1f0622a37..841dd3cd9 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImpl.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImpl.java @@ -16,40 +16,29 @@ package com.hivemq.client.internal.mqtt; -import com.hivemq.client.mqtt.MqttWebSocketConfig; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import com.hivemq.client.mqtt.MqttWebSocketConfig; + /** * @author David Katz * @author Christian Hoff */ public class MqttWebSocketConfigImpl implements MqttWebSocketConfig { - static final @NotNull MqttWebSocketConfigImpl DEFAULT = - new MqttWebSocketConfigImpl(DEFAULT_SERVER_PATH, DEFAULT_MQTT_SUBPROTOCOL); + static final @NotNull MqttWebSocketConfigImpl DEFAULT = new MqttWebSocketConfigImpl(DEFAULT_SERVER_PATH, DEFAULT_QUERY_STRING, + DEFAULT_MQTT_SUBPROTOCOL); private final @NotNull String serverPath; + private final @NotNull String queryString; private final @NotNull String subprotocol; - MqttWebSocketConfigImpl(final @NotNull String serverPath, final @NotNull String subprotocol) { + MqttWebSocketConfigImpl(final @NotNull String serverPath, final @NotNull String queryString, final @NotNull String subprotocol) { this.serverPath = serverPath; + this.queryString = queryString; this.subprotocol = subprotocol; - } - - @Override - public @NotNull String getServerPath() { - return serverPath; - } - @Override - public @NotNull String getSubprotocol() { - return subprotocol; - } - - @Override - public @NotNull MqttWebSocketConfigImplBuilder.Default extend() { - return new MqttWebSocketConfigImplBuilder.Default(this); } @Override @@ -65,6 +54,26 @@ public boolean equals(final @Nullable Object o) { return serverPath.equals(that.serverPath) && subprotocol.equals(that.subprotocol); } + @Override + public @NotNull MqttWebSocketConfigImplBuilder.Default extend() { + return new MqttWebSocketConfigImplBuilder.Default(this); + } + + @Override + public @NotNull String getQueryString() { + return queryString; + } + + @Override + public @NotNull String getServerPath() { + return serverPath; + } + + @Override + public @NotNull String getSubprotocol() { + return subprotocol; + } + @Override public int hashCode() { int result = serverPath.hashCode(); diff --git a/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImplBuilder.java b/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImplBuilder.java index e439aeb94..8ea1660f3 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImplBuilder.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/MqttWebSocketConfigImplBuilder.java @@ -17,68 +17,40 @@ package com.hivemq.client.internal.mqtt; -import com.hivemq.client.internal.util.Checks; -import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder; +import java.util.function.Function; + import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.util.function.Function; +import com.hivemq.client.internal.util.Checks; +import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder; /** * @author Silvio Giebl */ public abstract class MqttWebSocketConfigImplBuilder> { - private @NotNull String serverPath = MqttWebSocketConfigImpl.DEFAULT_SERVER_PATH; - private @NotNull String subprotocol = MqttWebSocketConfigImpl.DEFAULT_MQTT_SUBPROTOCOL; - - MqttWebSocketConfigImplBuilder() {} - - MqttWebSocketConfigImplBuilder(final @Nullable MqttWebSocketConfigImpl webSocketConfig) { - if (webSocketConfig != null) { - serverPath = webSocketConfig.getServerPath(); - subprotocol = webSocketConfig.getSubprotocol(); - } - } - - abstract @NotNull B self(); - - public @NotNull B serverPath(final @Nullable String serverPath) { - // remove any leading slashes - this.serverPath = Checks.notNull(serverPath, "Server path").replaceAll("^/+", ""); - return self(); - } - - public @NotNull B subprotocol(final @Nullable String subprotocol) { - this.subprotocol = Checks.notNull(subprotocol, "Subprotocol"); - return self(); - } - - public @NotNull MqttWebSocketConfigImpl build() { - return new MqttWebSocketConfigImpl(serverPath, subprotocol); - } - public static class Default extends MqttWebSocketConfigImplBuilder implements MqttWebSocketConfigBuilder { - public Default() {} + public Default() { + } Default(final @Nullable MqttWebSocketConfigImpl webSocketConfig) { super(webSocketConfig); } @Override - @NotNull Default self() { + @NotNull + Default self() { return this; } } - public static class Nested

extends MqttWebSocketConfigImplBuilder> - implements MqttWebSocketConfigBuilder.Nested

{ + public static class Nested

extends MqttWebSocketConfigImplBuilder> implements MqttWebSocketConfigBuilder.Nested

{ private final @NotNull Function parentConsumer; - public Nested( - final @Nullable MqttWebSocketConfigImpl webSocketConfig, + public Nested(final @Nullable MqttWebSocketConfigImpl webSocketConfig, final @NotNull Function parentConsumer) { super(webSocketConfig); @@ -86,13 +58,53 @@ public Nested( } @Override - @NotNull Nested

self() { - return this; + public @NotNull P applyWebSocketConfig() { + return parentConsumer.apply(build()); } @Override - public @NotNull P applyWebSocketConfig() { - return parentConsumer.apply(build()); + @NotNull + Nested

self() { + return this; } } + + private @NotNull String serverPath = MqttWebSocketConfigImpl.DEFAULT_SERVER_PATH; + + private @NotNull String queryString = MqttWebSocketConfigImpl.DEFAULT_QUERY_STRING; + + private @NotNull String subprotocol = MqttWebSocketConfigImpl.DEFAULT_MQTT_SUBPROTOCOL; + + MqttWebSocketConfigImplBuilder() { + } + + MqttWebSocketConfigImplBuilder(final @Nullable MqttWebSocketConfigImpl webSocketConfig) { + if (webSocketConfig != null) { + serverPath = webSocketConfig.getServerPath(); + queryString = webSocketConfig.getQueryString(); + subprotocol = webSocketConfig.getSubprotocol(); + } + } + + public @NotNull MqttWebSocketConfigImpl build() { + return new MqttWebSocketConfigImpl(serverPath, queryString, subprotocol); + } + + public @NotNull B queryString(final @Nullable String queryString) { + this.queryString = Checks.notNull(queryString, "QueryString"); + return self(); + } + + public @NotNull B serverPath(final @Nullable String serverPath) { + // remove any leading slashes + this.serverPath = Checks.notNull(serverPath, "Server path").replaceAll("^/+", ""); + return self(); + } + + public @NotNull B subprotocol(final @Nullable String subprotocol) { + this.subprotocol = Checks.notNull(subprotocol, "Subprotocol"); + return self(); + } + + abstract @NotNull B self(); } diff --git a/src/main/java/com/hivemq/client/internal/mqtt/handler/websocket/MqttWebSocketInitializer.java b/src/main/java/com/hivemq/client/internal/mqtt/handler/websocket/MqttWebSocketInitializer.java index ebf7d8ac8..298fd3be7 100644 --- a/src/main/java/com/hivemq/client/internal/mqtt/handler/websocket/MqttWebSocketInitializer.java +++ b/src/main/java/com/hivemq/client/internal/mqtt/handler/websocket/MqttWebSocketInitializer.java @@ -17,11 +17,19 @@ package com.hivemq.client.internal.mqtt.handler.websocket; +import java.net.URI; +import java.net.URISyntaxException; + +import javax.inject.Inject; + +import org.jetbrains.annotations.NotNull; + import com.hivemq.client.internal.mqtt.MqttClientConfig; import com.hivemq.client.internal.mqtt.MqttWebSocketConfigImpl; import com.hivemq.client.internal.mqtt.datatypes.MqttVariableByteInteger; import com.hivemq.client.internal.mqtt.handler.MqttChannelInitializer; import com.hivemq.client.internal.mqtt.ioc.ConnectionScope; + import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -29,11 +37,6 @@ import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler; import io.netty.handler.codec.http.websocketx.WebSocketVersion; -import org.jetbrains.annotations.NotNull; - -import javax.inject.Inject; -import java.net.URI; -import java.net.URISyntaxException; /** * @author Silvio Giebl @@ -56,8 +59,8 @@ public class MqttWebSocketInitializer extends ChannelInboundHandlerAdapter { private final @NotNull WebSocketBinaryFrameDecoder webSocketBinaryFrameDecoder; @Inject - MqttWebSocketInitializer( - final @NotNull MqttClientConfig clientConfig, final @NotNull MqttChannelInitializer mqttChannelInitializer, + MqttWebSocketInitializer(final @NotNull MqttClientConfig clientConfig, + final @NotNull MqttChannelInitializer mqttChannelInitializer, final @NotNull WebSocketBinaryFrameEncoder webSocketBinaryFrameEncoder, final @NotNull WebSocketBinaryFrameDecoder webSocketBinaryFrameDecoder) { @@ -67,30 +70,34 @@ public class MqttWebSocketInitializer extends ChannelInboundHandlerAdapter { this.webSocketBinaryFrameDecoder = webSocketBinaryFrameDecoder; } - public void initChannel(final @NotNull Channel channel, final @NotNull MqttWebSocketConfigImpl webSocketConfig) - throws URISyntaxException { + @Override + public void exceptionCaught(final @NotNull ChannelHandlerContext ctx, final @NotNull Throwable cause) { + mqttChannelInitializer.exceptionCaught(ctx, cause); + } + + public void initChannel(final @NotNull Channel channel, final @NotNull MqttWebSocketConfigImpl webSocketConfig) throws URISyntaxException { final HttpClientCodec httpCodec = new HttpClientCodec(); - final HttpObjectAggregator httpAggregator = - new HttpObjectAggregator(MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT); - - final URI uri = new URI((clientConfig.getTransportConfig().getRawSslConfig() == null) ? WEBSOCKET_URI_SCHEME : - WEBSOCKET_TLS_URI_SCHEME, null, clientConfig.getServerHost(), clientConfig.getServerPort(), - "/" + webSocketConfig.getServerPath(), null, null); - - final WebSocketClientProtocolHandler webSocketClientProtocolHandler = - new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13, webSocketConfig.getSubprotocol(), true, - null, MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT); - - channel.pipeline() - .addLast(HTTP_CODEC_NAME, httpCodec) - .addLast(HTTP_AGGREGATOR_NAME, httpAggregator) - .addLast(PROTOCOL_HANDLER_NAME, webSocketClientProtocolHandler) - .addLast(NAME, this) + final HttpObjectAggregator httpAggregator = new HttpObjectAggregator(MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT); + + final URI uri = new URI((clientConfig.getTransportConfig().getRawSslConfig() == null) ? WEBSOCKET_URI_SCHEME : WEBSOCKET_TLS_URI_SCHEME, null, + clientConfig.getServerHost(), clientConfig.getServerPort(), "/" + webSocketConfig.getServerPath(), webSocketConfig.getQueryString(), + null); + + final WebSocketClientProtocolHandler webSocketClientProtocolHandler = new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13, + webSocketConfig.getSubprotocol(), true, null, MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT); + + channel.pipeline().addLast(HTTP_CODEC_NAME, httpCodec).addLast(HTTP_AGGREGATOR_NAME, httpAggregator) + .addLast(PROTOCOL_HANDLER_NAME, webSocketClientProtocolHandler).addLast(NAME, this) .addLast(WebSocketBinaryFrameEncoder.NAME, webSocketBinaryFrameEncoder) .addLast(WebSocketBinaryFrameDecoder.NAME, webSocketBinaryFrameDecoder); } + @Override + public boolean isSharable() { + return false; + } + @Override public void userEventTriggered(final @NotNull ChannelHandlerContext ctx, final @NotNull Object evt) { if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) { @@ -100,14 +107,4 @@ public void userEventTriggered(final @NotNull ChannelHandlerContext ctx, final @ ctx.fireUserEventTriggered(evt); } } - - @Override - public void exceptionCaught(final @NotNull ChannelHandlerContext ctx, final @NotNull Throwable cause) { - mqttChannelInitializer.exceptionCaught(ctx, cause); - } - - @Override - public boolean isSharable() { - return false; - } } diff --git a/src/main/java/com/hivemq/client/mqtt/MqttWebSocketConfig.java b/src/main/java/com/hivemq/client/mqtt/MqttWebSocketConfig.java index 551cfc13e..f2d8ee609 100644 --- a/src/main/java/com/hivemq/client/mqtt/MqttWebSocketConfig.java +++ b/src/main/java/com/hivemq/client/mqtt/MqttWebSocketConfig.java @@ -16,9 +16,10 @@ package com.hivemq.client.mqtt; +import org.jetbrains.annotations.NotNull; + import com.hivemq.client.annotations.DoNotImplement; import com.hivemq.client.internal.mqtt.MqttWebSocketConfigImplBuilder; -import org.jetbrains.annotations.NotNull; /** * Configuration for a WebSocket transport to use by {@link MqttClient MQTT clients}. @@ -33,14 +34,21 @@ public interface MqttWebSocketConfig { /** * The default WebSocket server path. */ - @NotNull String DEFAULT_SERVER_PATH = ""; + @NotNull + String DEFAULT_SERVER_PATH = ""; + /** + * The default WebSocket query string. + */ + @NotNull + String DEFAULT_QUERY_STRING = ""; /** * The default WebSocket subprotocol. *

* See the WebSocket Subprotocol * Name Registry */ - @NotNull String DEFAULT_MQTT_SUBPROTOCOL = "mqtt"; + @NotNull + String DEFAULT_MQTT_SUBPROTOCOL = "mqtt"; /** * Creates a builder for a WebSocket configuration. @@ -52,20 +60,29 @@ public interface MqttWebSocketConfig { } /** - * @return the WebSocket server path. + * Creates a builder for extending this WebSocket configuration. + * + * @return the created builder. + * @since 1.1 */ - @NotNull String getServerPath(); + @NotNull + MqttWebSocketConfigBuilder extend(); /** - * @return the WebSocket subprotocol. + * @return the WebSocket query string. */ - @NotNull String getSubprotocol(); + @NotNull + String getQueryString(); /** - * Creates a builder for extending this WebSocket configuration. - * - * @return the created builder. - * @since 1.1 + * @return the WebSocket server path. + */ + @NotNull + String getServerPath(); + + /** + * @return the WebSocket subprotocol. */ - @NotNull MqttWebSocketConfigBuilder extend(); + @NotNull + String getSubprotocol(); }