Skip to content

Commit

Permalink
fix branch-3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Nov 19, 2024
1 parent 7d76b42 commit 4cc93c3
Show file tree
Hide file tree
Showing 15 changed files with 120 additions and 496 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,19 @@
import com.google.common.collect.ImmutableMap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.pulsar.handlers.mqtt.MopVersion;
import io.streamnative.pulsar.handlers.mqtt.broker.channel.MQTTChannelInitializer;
import io.streamnative.pulsar.handlers.mqtt.common.utils.ConfigurationUtils;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyConfiguration;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;

/**
* MQTT Protocol Handler load and run by Pulsar Service.
*/
Expand All @@ -62,8 +58,6 @@ public class MQTTProtocolHandler implements ProtocolHandler {
@Getter
private MQTTService mqttService;

private ScheduledExecutorService sslContextRefresher;

@Override
public String protocolName() {
return PROTOCOL_NAME;
Expand Down Expand Up @@ -120,9 +114,6 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
checkArgument(mqttConfig.getMqttListeners() != null);
checkArgument(brokerService != null);

this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("mop-ssl-context-refresher"));

String listeners = mqttConfig.getMqttListeners();
String[] parts = listeners.split(LISTENER_DEL);
try {
Expand All @@ -133,28 +124,27 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(mqttService, false, false, sslContextRefresher));
new MQTTChannelInitializer(mqttService, false, false));

} else if (listener.startsWith(SSL_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(mqttService, true, false, sslContextRefresher));
new MQTTChannelInitializer(mqttService, true, false));

} else if (listener.startsWith(SSL_PSK_PREFIX) && mqttConfig.isMqttTlsPskEnabled()) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(
mqttService, false, true, false, sslContextRefresher));
new MQTTChannelInitializer(mqttService, false, true, false));

} else if (listener.startsWith(WS_PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(mqttService, false, true, sslContextRefresher));
new MQTTChannelInitializer(mqttService, false, true));

} else if (listener.startsWith(WS_SSL_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new MQTTChannelInitializer(mqttService, true, true, sslContextRefresher));
new MQTTChannelInitializer(mqttService, true, true));

} else {
log.error("MQTT listener {} not supported. supports {}, {} or {}",
Expand All @@ -171,9 +161,6 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti

@Override
public void close() {
if (sslContextRefresher != null) {
sslContextRefresher.shutdownNow();
}
if (proxyService != null) {
proxyService.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.streamnative.pulsar.handlers.mqtt.broker.MQTTServerConfiguration;
Expand All @@ -32,49 +33,63 @@
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterDecoder;
import io.streamnative.pulsar.handlers.mqtt.common.adapter.MqttAdapterEncoder;
import io.streamnative.pulsar.handlers.mqtt.common.psk.PSKUtils;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.util.PulsarSslConfiguration;
import org.apache.pulsar.common.util.PulsarSslFactory;
import org.apache.pulsar.common.util.NettyServerSslContextBuilder;
import org.apache.pulsar.common.util.SslContextAutoRefreshBuilder;
import org.apache.pulsar.common.util.keystoretls.NettySSLContextAutoRefreshBuilder;

/**
* A channel initializer that initialize channels for MQTT protocol.
*/
@Slf4j
public class MQTTChannelInitializer extends ChannelInitializer<SocketChannel> {

private final MQTTServerConfiguration mqttConfig;
private final MQTTService mqttService;
private final boolean enableTls;
private final boolean enableTlsPsk;
private final boolean enableWs;
private PulsarSslFactory sslFactory;
private final boolean tlsEnabledWithKeyStore;

public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableWs,
ScheduledExecutorService sslContextRefresher) throws Exception {
this(mqttService, enableTls, false, enableWs, sslContextRefresher);
private SslContextAutoRefreshBuilder<SslContext> sslCtxRefresher;
private NettySSLContextAutoRefreshBuilder nettySSLContextAutoRefreshBuilder;

public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableWs) {
this(mqttService, enableTls, false, enableWs);
}

public MQTTChannelInitializer(
MQTTService mqttService, boolean enableTls, boolean enableTlsPsk, boolean enableWs,
ScheduledExecutorService sslContextRefresher) throws Exception {
public MQTTChannelInitializer(MQTTService mqttService, boolean enableTls, boolean enableTlsPsk, boolean enableWs) {
super();
this.mqttService = mqttService;
this.mqttConfig = mqttService.getServerConfiguration();
this.enableTls = enableTls;
this.enableTlsPsk = enableTlsPsk;
this.enableWs = enableWs;
this.tlsEnabledWithKeyStore = mqttConfig.isMqttTlsEnabledWithKeyStore();
if (this.enableTls) {
PulsarSslConfiguration sslConfiguration = buildSslConfiguration(mqttConfig);
this.sslFactory = (PulsarSslFactory) Class.forName(mqttConfig.getSslFactoryPlugin())
.getConstructor().newInstance();
this.sslFactory.initialize(sslConfiguration);
this.sslFactory.createInternalSslContext();
if (mqttConfig.getTlsCertRefreshCheckDurationSec() > 0) {
sslContextRefresher.scheduleWithFixedDelay(this::refreshSslContext,
mqttConfig.getTlsCertRefreshCheckDurationSec(),
mqttConfig.getTlsCertRefreshCheckDurationSec(), TimeUnit.SECONDS);
if (tlsEnabledWithKeyStore) {
nettySSLContextAutoRefreshBuilder = new NettySSLContextAutoRefreshBuilder(
mqttConfig.getMqttTlsProvider(),
mqttConfig.getMqttTlsKeyStoreType(),
mqttConfig.getMqttTlsKeyStore(),
mqttConfig.getMqttTlsKeyStorePassword(),
mqttConfig.isMqttTlsAllowInsecureConnection(),
mqttConfig.getMqttTlsTrustStoreType(),
mqttConfig.getMqttTlsTrustStore(),
mqttConfig.getMqttTlsTrustStorePassword(),
mqttConfig.isMqttTlsRequireTrustedClientCertOnConnect(),
mqttConfig.getMqttTlsCiphers(),
mqttConfig.getMqttTlsProtocols(),
mqttConfig.getMqttTlsCertRefreshCheckDurationSec());
} else {
sslCtxRefresher = new NettyServerSslContextBuilder(
null,
mqttConfig.isMqttTlsAllowInsecureConnection(),
mqttConfig.getMqttTlsTrustCertsFilePath(),
mqttConfig.getMqttTlsCertificateFilePath(),
mqttConfig.getMqttTlsKeyFilePath(),
mqttConfig.getMqttTlsCiphers(),
mqttConfig.getMqttTlsProtocols(),
mqttConfig.isMqttTlsRequireTrustedClientCertOnConnect(),
mqttConfig.getMqttTlsCertRefreshCheckDurationSec());
}
}
}
Expand All @@ -83,7 +98,12 @@ public MQTTChannelInitializer(
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst("idleStateHandler", new IdleStateHandler(0, 0, 120));
if (this.enableTls) {
ch.pipeline().addLast(TLS_HANDLER, new SslHandler(sslFactory.createServerSslEngine(ch.alloc())));
if (this.tlsEnabledWithKeyStore) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(nettySSLContextAutoRefreshBuilder.get().createSSLEngine()));
} else {
ch.pipeline().addLast(TLS_HANDLER, sslCtxRefresher.get().newHandler(ch.alloc()));
}
} else if (this.enableTlsPsk) {
ch.pipeline().addLast(TLS_HANDLER,
new SslHandler(PSKUtils.createServerEngine(ch, mqttService.getPskConfiguration())));
Expand Down Expand Up @@ -121,36 +141,4 @@ private void addWsHandler(ChannelPipeline pipeline) {
true, mqttConfig.getWebSocketMaxFrameSize()));
pipeline.addLast(Constants.HANDLER_MQTT_WEB_SOCKET_CODEC, new MqttWebSocketCodec());
}

protected PulsarSslConfiguration buildSslConfiguration(MQTTServerConfiguration config) {
return PulsarSslConfiguration.builder()
.tlsProvider(config.getMqttTlsProvider())
.tlsKeyStoreType(config.getMqttTlsKeyStoreType())
.tlsKeyStorePath(config.getMqttTlsKeyStore())
.tlsKeyStorePassword(config.getMqttTlsKeyStorePassword())
.tlsTrustStoreType(config.getMqttTlsTrustStoreType())
.tlsTrustStorePath(config.getMqttTlsTrustStore())
.tlsTrustStorePassword(config.getMqttTlsTrustStorePassword())
.tlsCiphers(config.getMqttTlsCiphers())
.tlsProtocols(config.getMqttTlsProtocols())
.tlsTrustCertsFilePath(config.getMqttTlsTrustCertsFilePath())
.tlsCertificateFilePath(config.getMqttTlsCertificateFilePath())
.tlsKeyFilePath(config.getMqttTlsKeyFilePath())
.allowInsecureConnection(config.isMqttTlsAllowInsecureConnection())
.requireTrustedClientCertOnConnect(config.isMqttTlsRequireTrustedClientCertOnConnect())
.tlsEnabledWithKeystore(config.isMqttTlsEnabledWithKeyStore())
.tlsCustomParams(config.getSslFactoryPluginParams())
.authData(null)
.serverMode(true)
.build();
}

protected void refreshSslContext() {
try {
this.sslFactory.update();
} catch (Exception e) {
log.error("Failed to refresh SSL context for mqtt channel.", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -52,7 +52,7 @@ public void completed(Exception exception, long ledgerId, long entryId) {
topic.getName(), ledgerId, entryId);
}
topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
positionFuture.complete(PositionFactory.create(ledgerId, entryId));
positionFuture.complete(PositionImpl.get(ledgerId, entryId));
}
recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.bookkeeper.mledger.impl.AckSetStateUtil;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
Expand Down Expand Up @@ -177,9 +176,9 @@ public void processPubAck(MqttAdapterMessage adapter) {
for (int i = 0; i < packet.getBatchSize(); i++) {
ackSets[i] = packet.getBatchIndex() == i ? 0 : 1;
}
position = AckSetStateUtil.createPositionWithAckSet(packet.getLedgerId(), packet.getEntryId(), ackSets);
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId(), ackSets);
} else {
position = PositionFactory.create(packet.getLedgerId(), packet.getEntryId());
position = PositionImpl.get(packet.getLedgerId(), packet.getEntryId());
}
packet.getConsumer().getSubscription().acknowledgeMessage(Collections.singletonList(position),
CommandAck.AckType.Individual, Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
new InetSocketAddress(brokerService.pulsar().getBindAddress(),
getProxyListenerPort(listener)),
new MQTTProxyChannelInitializer(
proxyService, proxyConfig, false, false, sslContextRefresher));
proxyService, proxyConfig, false, false));
}
}
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
import io.streamnative.pulsar.handlers.mqtt.proxy.handler.PulsarServiceLookupHandler;
import io.streamnative.pulsar.handlers.mqtt.proxy.impl.MQTTProxyException;
import java.io.Closeable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -77,7 +75,6 @@ public class MQTTProxyService implements Closeable {

private DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("mqtt-redirect-acceptor");
private DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("mqtt-redirect-io");
private ScheduledExecutorService sslContextRefresher;

public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration proxyConfig) {
configValid(proxyConfig);
Expand Down Expand Up @@ -110,8 +107,6 @@ public MQTTProxyService(BrokerService brokerService, MQTTProxyConfiguration prox
this.eventCenter = new PulsarEventCenterImpl(brokerService,
proxyConfig.getEventCenterCallbackPoolThreadNum());
this.proxyAdapter = new MQTTProxyAdapter(this);
this.sslContextRefresher = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("mop-proxy-ssl-context-refresher"));
}

private void configValid(MQTTProxyConfiguration proxyConfig) {
Expand All @@ -125,8 +120,7 @@ public void start() throws MQTTProxyException {
serverBootstrap.group(acceptorGroup, workerGroup);
serverBootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(serverBootstrap);
serverBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, sslContextRefresher));
serverBootstrap.childHandler(new MQTTProxyChannelInitializer(this, proxyConfig, false));

try {
listenChannel = serverBootstrap.bind(proxyConfig.getMqttProxyPort()).sync().channel();
Expand All @@ -137,8 +131,7 @@ public void start() throws MQTTProxyException {

if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, sslContextRefresher));
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(this, proxyConfig, true));
try {
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
Expand All @@ -157,8 +150,7 @@ public void start() throws MQTTProxyException {
this.eventService.addListener(pskConfiguration.getEventListener());
// Add channel initializer
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(this, proxyConfig, false, true));
try {
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
Expand All @@ -179,7 +171,7 @@ public void start0() throws MQTTProxyException {
if (proxyConfig.isMqttProxyTlsEnabled() || proxyConfig.isMqttProxyMTlsAuthenticationEnabled()) {
ServerBootstrap tlsBootstrap = serverBootstrap.clone();
tlsBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, true, sslContextRefresher));
this, proxyConfig, true));
try {
listenChannelTls = tlsBootstrap.bind(proxyConfig.getMqttProxyTlsPort()).sync().channel();
log.info("Started MQTT Proxy with TLS on {}", listenChannelTls.localAddress());
Expand All @@ -199,7 +191,7 @@ public void start0() throws MQTTProxyException {
// Add channel initializer
ServerBootstrap tlsPskBootstrap = serverBootstrap.clone();
tlsPskBootstrap.childHandler(new MQTTProxyChannelInitializer(
this, proxyConfig, false, true, sslContextRefresher));
this, proxyConfig, false, true));
try {
listenChannelTlsPsk = tlsPskBootstrap.bind(proxyConfig.getMqttProxyTlsPskPort()).sync().channel();
log.info("Started MQTT Proxy with TLS-PSK on {}", listenChannelTlsPsk.localAddress());
Expand Down Expand Up @@ -230,8 +222,5 @@ public void close() {
}
this.proxyAdapter.shutdown();
this.connectionManager.close();
if (sslContextRefresher != null) {
sslContextRefresher.shutdownNow();
}
}
}
Loading

0 comments on commit 4cc93c3

Please sign in to comment.