Skip to content

Commit

Permalink
[improve][test] Add integration test for websocket (#17843)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andras Beni authored Sep 28, 2022
1 parent 31203c3 commit 0678b82
Show file tree
Hide file tree
Showing 11 changed files with 311 additions and 137 deletions.
4 changes: 2 additions & 2 deletions build/run_integration_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ test_group_messaging() {
mvn_run_integration_test "$@" -DintegrationTestSuiteFile=pulsar-messaging.xml -DintegrationTests
# run integration proxy tests
mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-proxy.xml -DintegrationTests
# run integration proxy with WebSocket tests
mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-proxy-websocket.xml -DintegrationTests
# run integration WebSocket tests
mvn_run_integration_test --skip-build-deps "$@" -DintegrationTestSuiteFile=pulsar-websocket.xml -DintegrationTests
}

test_group_plugin() {
Expand Down
4 changes: 2 additions & 2 deletions tests/docker-images/latest-version-image/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ RUN mkdir -p /var/log/pulsar && mkdir -p /var/run/supervisor/ && mkdir -p /pulsa

COPY conf/supervisord.conf /etc/supervisord.conf
COPY conf/global-zk.conf conf/local-zk.conf conf/bookie.conf conf/broker.conf conf/functions_worker.conf \
conf/proxy.conf conf/presto_worker.conf /etc/supervisord/conf.d/
conf/proxy.conf conf/presto_worker.conf conf/websocket.conf /etc/supervisord/conf.d/

COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \
ssl/admin.key-pk8.pem ssl/admin.cert.pem \
Expand All @@ -81,7 +81,7 @@ COPY ssl/ca.cert.pem ssl/broker.key-pk8.pem ssl/broker.cert.pem \

COPY scripts/init-cluster.sh scripts/run-global-zk.sh scripts/run-local-zk.sh \
scripts/run-bookie.sh scripts/run-broker.sh scripts/run-functions-worker.sh scripts/run-proxy.sh scripts/run-presto-worker.sh \
scripts/run-standalone.sh \
scripts/run-standalone.sh scripts/run-websocket.sh \
/pulsar/bin/

COPY conf/presto/jvm.config /pulsar/trino/conf
Expand Down
27 changes: 27 additions & 0 deletions tests/docker-images/latest-version-image/conf/websocket.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

[program:websocket]
autostart=false
redirect_stderr=true
stdout_logfile=/var/log/pulsar/pulsar-websocket.log
directory=/pulsar
environment=PULSAR_MEM="-Xmx128M",PULSAR_GC="-XX:+UseZGC"
command=/pulsar/bin/pulsar websocket
user=pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
# under the License.
#

bin/apply-config-from-env.py conf/proxy.conf && \
bin/apply-config-from-env.py conf/websocket.conf && \
bin/apply-config-from-env.py conf/pulsar_env.sh

if [ -z "$NO_AUTOSTART" ]; then
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/proxy.conf
sed -i 's/autostart=.*/autostart=true/' /etc/supervisord/conf.d/websocket.conf
fi

bin/watch-znode.py -z $zookeeperServers -p /initialized-$clusterName -w
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.containers;

import org.apache.pulsar.tests.integration.utils.DockerUtils;

public class WebSocketContainer extends PulsarContainer<WebSocketContainer> {

public WebSocketContainer(String clusterName, String hostName) {
super(clusterName, hostName, hostName,
"bin/run-websocket.sh",
-1,
BROKER_HTTP_PORT, "/admin/v2/proxy-stats/stats");
}

public String getWSUrl() {
return "ws://" + getHost() + ":" + getMappedPort(BROKER_HTTP_PORT);
}

@Override
protected void afterStart() {
DockerUtils.runCommandAsyncWithLogging(this.dockerClient, this.getContainerId(),
"tail", "-f", "/var/log/pulsar/pulsar-websocket.log");
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContai
@Getter
private Map<String, PrestoWorkerContainer> sqlFollowWorkerContainers;
private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
private Map<String, Map<String, String>> externalServiceEnvs;
private final boolean enablePrestoWorker;

private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
Expand Down Expand Up @@ -280,14 +281,19 @@ public void start() throws Exception {

// start external services
this.externalServices = spec.externalServices;
this.externalServiceEnvs = spec.externalServiceEnvs;
if (null != externalServices) {
externalServices.entrySet().parallelStream().forEach(service -> {
GenericContainer<?> serviceContainer = service.getValue();
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(service.getKey());
if (null != externalServiceEnvs && null != externalServiceEnvs.get(service.getKey())) {
Map<String, String> env = externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap());
serviceContainer.withEnv(env);
}
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
serviceContainer.start();
log.info("Successfully start external service {}.", service.getKey());
log.info("Successfully started external service {}.", service.getKey());
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public class PulsarClusterSpec {
@Singular
Map<String, GenericContainer<?>> externalServices;

/**
* Specify envs for external services.
*/
@Singular
Map<String, Map<String, String>> externalServiceEnvs;

/**
* Returns the flag whether to enable/disable container log.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.websocket;


import com.google.common.collect.ImmutableMap;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.WebSocketContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Map;

/**
* Test cases for websocket.
*/
public class TestWebSocket extends WebSocketTestSuite {

public static final String WEBSOCKET = "websocket";

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
String clusterName,
PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {

Map<String, String> enableWebSocket = Collections.singletonMap("webSocketServiceEnabled", "true");
specBuilder.brokerEnvs(enableWebSocket);
specBuilder.proxyEnvs(enableWebSocket);

specBuilder.externalService(WEBSOCKET, new WebSocketContainer(clusterName, WEBSOCKET));
specBuilder.externalServiceEnv(WEBSOCKET, ImmutableMap.<String, String>builder()
.put("configurationMetadataStoreUrl", CSContainer.NAME + ":" + CSContainer.CS_PORT)
.put("webServicePort", "" + WebSocketContainer.BROKER_HTTP_PORT)
.put("clusterName", clusterName)
.build());
return super.beforeSetupCluster(clusterName, specBuilder);
}

@Test
public void testExternalService() throws Exception {
WebSocketContainer service = (WebSocketContainer) pulsarCluster.getExternalServices().get(WEBSOCKET);
testWebSocket(service.getWSUrl());
}

@Test
public void testBroker() throws Exception {
BrokerContainer broker = pulsarCluster.getAnyBroker();
String url = "ws://" + broker.getHost() + ":" + broker.getMappedPort(BrokerContainer.BROKER_HTTP_PORT);
testWebSocket(url);
}

@Test
public void testProxy() throws Exception {
String url = pulsarCluster.getProxy().getHttpServiceUrl().replaceFirst("http", "ws");
testWebSocket(url);
}
}
Loading

0 comments on commit 0678b82

Please sign in to comment.