Skip to content

Commit

Permalink
Issue hivemq#335
Browse files Browse the repository at this point in the history
  • Loading branch information
alsanrum committed Oct 9, 2019
1 parent a755079 commit 8a60245
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,29 @@

package com.hivemq.client.internal.mqtt;

import com.hivemq.client.mqtt.MqttWebSocketConfig;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import com.hivemq.client.mqtt.MqttWebSocketConfig;

/**
* @author David Katz
* @author Christian Hoff
*/
public class MqttWebSocketConfigImpl implements MqttWebSocketConfig {

static final @NotNull MqttWebSocketConfigImpl DEFAULT =
new MqttWebSocketConfigImpl(DEFAULT_SERVER_PATH, DEFAULT_MQTT_SUBPROTOCOL);
static final @NotNull MqttWebSocketConfigImpl DEFAULT = new MqttWebSocketConfigImpl(DEFAULT_SERVER_PATH, DEFAULT_QUERY_STRING,
DEFAULT_MQTT_SUBPROTOCOL);

private final @NotNull String serverPath;
private final @NotNull String queryString;
private final @NotNull String subprotocol;

MqttWebSocketConfigImpl(final @NotNull String serverPath, final @NotNull String subprotocol) {
MqttWebSocketConfigImpl(final @NotNull String serverPath, final @NotNull String queryString, final @NotNull String subprotocol) {
this.serverPath = serverPath;
this.queryString = queryString;
this.subprotocol = subprotocol;
}

@Override
public @NotNull String getServerPath() {
return serverPath;
}

@Override
public @NotNull String getSubprotocol() {
return subprotocol;
}

@Override
public @NotNull MqttWebSocketConfigImplBuilder.Default extend() {
return new MqttWebSocketConfigImplBuilder.Default(this);
}

@Override
Expand All @@ -65,6 +54,26 @@ public boolean equals(final @Nullable Object o) {
return serverPath.equals(that.serverPath) && subprotocol.equals(that.subprotocol);
}

@Override
public @NotNull MqttWebSocketConfigImplBuilder.Default extend() {
return new MqttWebSocketConfigImplBuilder.Default(this);
}

@Override
public @NotNull String getQueryString() {
return queryString;
}

@Override
public @NotNull String getServerPath() {
return serverPath;
}

@Override
public @NotNull String getSubprotocol() {
return subprotocol;
}

@Override
public int hashCode() {
int result = serverPath.hashCode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,82 +17,94 @@

package com.hivemq.client.internal.mqtt;

import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder;
import java.util.function.Function;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.function.Function;
import com.hivemq.client.internal.util.Checks;
import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder;

/**
* @author Silvio Giebl
*/
public abstract class MqttWebSocketConfigImplBuilder<B extends MqttWebSocketConfigImplBuilder<B>> {

private @NotNull String serverPath = MqttWebSocketConfigImpl.DEFAULT_SERVER_PATH;
private @NotNull String subprotocol = MqttWebSocketConfigImpl.DEFAULT_MQTT_SUBPROTOCOL;

MqttWebSocketConfigImplBuilder() {}

MqttWebSocketConfigImplBuilder(final @Nullable MqttWebSocketConfigImpl webSocketConfig) {
if (webSocketConfig != null) {
serverPath = webSocketConfig.getServerPath();
subprotocol = webSocketConfig.getSubprotocol();
}
}

abstract @NotNull B self();

public @NotNull B serverPath(final @Nullable String serverPath) {
// remove any leading slashes
this.serverPath = Checks.notNull(serverPath, "Server path").replaceAll("^/+", "");
return self();
}

public @NotNull B subprotocol(final @Nullable String subprotocol) {
this.subprotocol = Checks.notNull(subprotocol, "Subprotocol");
return self();
}

public @NotNull MqttWebSocketConfigImpl build() {
return new MqttWebSocketConfigImpl(serverPath, subprotocol);
}

public static class Default extends MqttWebSocketConfigImplBuilder<Default> implements MqttWebSocketConfigBuilder {

public Default() {}
public Default() {
}

Default(final @Nullable MqttWebSocketConfigImpl webSocketConfig) {
super(webSocketConfig);
}

@Override
@NotNull Default self() {
@NotNull
Default self() {
return this;
}
}

public static class Nested<P> extends MqttWebSocketConfigImplBuilder<Nested<P>>
implements MqttWebSocketConfigBuilder.Nested<P> {
public static class Nested<P> extends MqttWebSocketConfigImplBuilder<Nested<P>> implements MqttWebSocketConfigBuilder.Nested<P> {

private final @NotNull Function<? super MqttWebSocketConfigImpl, P> parentConsumer;

public Nested(
final @Nullable MqttWebSocketConfigImpl webSocketConfig,
public Nested(final @Nullable MqttWebSocketConfigImpl webSocketConfig,
final @NotNull Function<? super MqttWebSocketConfigImpl, P> parentConsumer) {

super(webSocketConfig);
this.parentConsumer = parentConsumer;
}

@Override
@NotNull Nested<P> self() {
return this;
public @NotNull P applyWebSocketConfig() {
return parentConsumer.apply(build());
}

@Override
public @NotNull P applyWebSocketConfig() {
return parentConsumer.apply(build());
@NotNull
Nested<P> self() {
return this;
}
}

private @NotNull String serverPath = MqttWebSocketConfigImpl.DEFAULT_SERVER_PATH;

private @NotNull String queryString = MqttWebSocketConfigImpl.DEFAULT_QUERY_STRING;

private @NotNull String subprotocol = MqttWebSocketConfigImpl.DEFAULT_MQTT_SUBPROTOCOL;

MqttWebSocketConfigImplBuilder() {
}

MqttWebSocketConfigImplBuilder(final @Nullable MqttWebSocketConfigImpl webSocketConfig) {
if (webSocketConfig != null) {
serverPath = webSocketConfig.getServerPath();
queryString = webSocketConfig.getQueryString();
subprotocol = webSocketConfig.getSubprotocol();
}
}

public @NotNull MqttWebSocketConfigImpl build() {
return new MqttWebSocketConfigImpl(serverPath, queryString, subprotocol);
}

public @NotNull B queryString(final @Nullable String queryString) {
this.queryString = Checks.notNull(queryString, "QueryString");
return self();
}

public @NotNull B serverPath(final @Nullable String serverPath) {
// remove any leading slashes
this.serverPath = Checks.notNull(serverPath, "Server path").replaceAll("^/+", "");
return self();
}

public @NotNull B subprotocol(final @Nullable String subprotocol) {
this.subprotocol = Checks.notNull(subprotocol, "Subprotocol");
return self();
}

abstract @NotNull B self();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,26 @@

package com.hivemq.client.internal.mqtt.handler.websocket;

import java.net.URI;
import java.net.URISyntaxException;

import javax.inject.Inject;

import org.jetbrains.annotations.NotNull;

import com.hivemq.client.internal.mqtt.MqttClientConfig;
import com.hivemq.client.internal.mqtt.MqttWebSocketConfigImpl;
import com.hivemq.client.internal.mqtt.datatypes.MqttVariableByteInteger;
import com.hivemq.client.internal.mqtt.handler.MqttChannelInitializer;
import com.hivemq.client.internal.mqtt.ioc.ConnectionScope;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import org.jetbrains.annotations.NotNull;

import javax.inject.Inject;
import java.net.URI;
import java.net.URISyntaxException;

/**
* @author Silvio Giebl
Expand All @@ -56,8 +59,8 @@ public class MqttWebSocketInitializer extends ChannelInboundHandlerAdapter {
private final @NotNull WebSocketBinaryFrameDecoder webSocketBinaryFrameDecoder;

@Inject
MqttWebSocketInitializer(
final @NotNull MqttClientConfig clientConfig, final @NotNull MqttChannelInitializer mqttChannelInitializer,
MqttWebSocketInitializer(final @NotNull MqttClientConfig clientConfig,
final @NotNull MqttChannelInitializer mqttChannelInitializer,
final @NotNull WebSocketBinaryFrameEncoder webSocketBinaryFrameEncoder,
final @NotNull WebSocketBinaryFrameDecoder webSocketBinaryFrameDecoder) {

Expand All @@ -67,30 +70,34 @@ public class MqttWebSocketInitializer extends ChannelInboundHandlerAdapter {
this.webSocketBinaryFrameDecoder = webSocketBinaryFrameDecoder;
}

public void initChannel(final @NotNull Channel channel, final @NotNull MqttWebSocketConfigImpl webSocketConfig)
throws URISyntaxException {
@Override
public void exceptionCaught(final @NotNull ChannelHandlerContext ctx, final @NotNull Throwable cause) {
mqttChannelInitializer.exceptionCaught(ctx, cause);
}

public void initChannel(final @NotNull Channel channel, final @NotNull MqttWebSocketConfigImpl webSocketConfig) throws URISyntaxException {

final HttpClientCodec httpCodec = new HttpClientCodec();
final HttpObjectAggregator httpAggregator =
new HttpObjectAggregator(MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT);

final URI uri = new URI((clientConfig.getTransportConfig().getRawSslConfig() == null) ? WEBSOCKET_URI_SCHEME :
WEBSOCKET_TLS_URI_SCHEME, null, clientConfig.getServerHost(), clientConfig.getServerPort(),
"/" + webSocketConfig.getServerPath(), null, null);

final WebSocketClientProtocolHandler webSocketClientProtocolHandler =
new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13, webSocketConfig.getSubprotocol(), true,
null, MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT);

channel.pipeline()
.addLast(HTTP_CODEC_NAME, httpCodec)
.addLast(HTTP_AGGREGATOR_NAME, httpAggregator)
.addLast(PROTOCOL_HANDLER_NAME, webSocketClientProtocolHandler)
.addLast(NAME, this)
final HttpObjectAggregator httpAggregator = new HttpObjectAggregator(MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT);

final URI uri = new URI((clientConfig.getTransportConfig().getRawSslConfig() == null) ? WEBSOCKET_URI_SCHEME : WEBSOCKET_TLS_URI_SCHEME, null,
clientConfig.getServerHost(), clientConfig.getServerPort(), "/" + webSocketConfig.getServerPath(), webSocketConfig.getQueryString(),
null);

final WebSocketClientProtocolHandler webSocketClientProtocolHandler = new WebSocketClientProtocolHandler(uri, WebSocketVersion.V13,
webSocketConfig.getSubprotocol(), true, null, MqttVariableByteInteger.MAXIMUM_PACKET_SIZE_LIMIT);

channel.pipeline().addLast(HTTP_CODEC_NAME, httpCodec).addLast(HTTP_AGGREGATOR_NAME, httpAggregator)
.addLast(PROTOCOL_HANDLER_NAME, webSocketClientProtocolHandler).addLast(NAME, this)
.addLast(WebSocketBinaryFrameEncoder.NAME, webSocketBinaryFrameEncoder)
.addLast(WebSocketBinaryFrameDecoder.NAME, webSocketBinaryFrameDecoder);
}

@Override
public boolean isSharable() {
return false;
}

@Override
public void userEventTriggered(final @NotNull ChannelHandlerContext ctx, final @NotNull Object evt) {
if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
Expand All @@ -100,14 +107,4 @@ public void userEventTriggered(final @NotNull ChannelHandlerContext ctx, final @
ctx.fireUserEventTriggered(evt);
}
}

@Override
public void exceptionCaught(final @NotNull ChannelHandlerContext ctx, final @NotNull Throwable cause) {
mqttChannelInitializer.exceptionCaught(ctx, cause);
}

@Override
public boolean isSharable() {
return false;
}
}
41 changes: 29 additions & 12 deletions src/main/java/com/hivemq/client/mqtt/MqttWebSocketConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.hivemq.client.mqtt;

import org.jetbrains.annotations.NotNull;

import com.hivemq.client.annotations.DoNotImplement;
import com.hivemq.client.internal.mqtt.MqttWebSocketConfigImplBuilder;
import org.jetbrains.annotations.NotNull;

/**
* Configuration for a WebSocket transport to use by {@link MqttClient MQTT clients}.
Expand All @@ -33,14 +34,21 @@ public interface MqttWebSocketConfig {
/**
* The default WebSocket server path.
*/
@NotNull String DEFAULT_SERVER_PATH = "";
@NotNull
String DEFAULT_SERVER_PATH = "";
/**
* The default WebSocket query string.
*/
@NotNull
String DEFAULT_QUERY_STRING = "";
/**
* The default WebSocket subprotocol.
* <p>
* See the <a href="https://www.iana.org/assignments/websocket/websocket.xml#subprotocol-name">WebSocket Subprotocol
* Name Registry</a>
*/
@NotNull String DEFAULT_MQTT_SUBPROTOCOL = "mqtt";
@NotNull
String DEFAULT_MQTT_SUBPROTOCOL = "mqtt";

/**
* Creates a builder for a WebSocket configuration.
Expand All @@ -52,20 +60,29 @@ public interface MqttWebSocketConfig {
}

/**
* @return the WebSocket server path.
* Creates a builder for extending this WebSocket configuration.
*
* @return the created builder.
* @since 1.1
*/
@NotNull String getServerPath();
@NotNull
MqttWebSocketConfigBuilder extend();

/**
* @return the WebSocket subprotocol.
* @return the WebSocket query string.
*/
@NotNull String getSubprotocol();
@NotNull
String getQueryString();

/**
* Creates a builder for extending this WebSocket configuration.
*
* @return the created builder.
* @since 1.1
* @return the WebSocket server path.
*/
@NotNull
String getServerPath();

/**
* @return the WebSocket subprotocol.
*/
@NotNull MqttWebSocketConfigBuilder extend();
@NotNull
String getSubprotocol();
}

0 comments on commit 8a60245

Please sign in to comment.