Skip to content

Commit

Permalink
Test coverage and add note about websocket to readme. (#956)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Aug 22, 2023
1 parent bd40a34 commit 06e73f2
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 154 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ Please see unit test for examples of this behavior.
and
`testMaxPayloadJs` in [JetStreamPubTests.cs](src/test/java/io/nats/client/impl/JetStreamPubTests.java)

#### Version 2.16.8 Websocket Support

As of version 2.16.8 Websocket (`ws` and `wss`) protocols are supported for connecting to the server.
For instance, your server bootstrap url might be `ws://my-nats-host:80` or `wss://my-nats-host:443`.

Your server must be properly configured for websocket, see the NATS.IO docs
[WebSocket Configuration Example](https://docs.nats.io/running-a-nats-service/configuration/websocket/websocket_conf)
for more information.

If you use secure websockets (wss), your connection must be securely configured in the same way you would configure a `tls` connection.

#### Version 2.16.0 Consumer Create

This release by default will use a new JetStream consumer create API when interacting with nats-server version 2.9.0 or higher.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/BaseConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface BaseConsumerContext {

/**
* Read the next message with provided max wait
* @param maxWait duration of max wait
* @param maxWait duration of max wait. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
* @return the next message or null if the max wait expires
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand All @@ -48,7 +48,7 @@ public interface BaseConsumerContext {

/**
* Read the next message with provided max wait
* @param maxWaitMillis the max wait value in milliseconds
* @param maxWaitMillis the max wait value in milliseconds. Cannot be less than {@value BaseConsumeOptions#MIN_EXPIRES_MILLS} milliseconds.
* @return the next message or null if the max wait expires
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ public String getConsumerName() {
*/
@Override
public ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException {
if (consumerName != null) {
cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, cachedConsumerInfo.getName());
}
cachedConsumerInfo = streamCtx.jsm.getConsumerInfo(streamCtx.streamName, consumerName);
return cachedConsumerInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
* Implementation of Ordered Consumer Context
*/
public class NatsOrderedConsumerContext implements OrderedConsumerContext {
NatsConsumerContext impl;
private NatsConsumerContext impl;

NatsOrderedConsumerContext(NatsStreamContext streamContext, OrderedConsumerConfiguration config) {
impl = new NatsConsumerContext(streamContext, config);
}
Expand Down
33 changes: 30 additions & 3 deletions src/test/java/io/nats/client/impl/JetStreamTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,30 @@ public NatsMessage getTestMessage(String replyTo, String sid) {
// ----------------------------------------------------------------------------------------------------
// Management
// ----------------------------------------------------------------------------------------------------
public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects)
throws IOException, JetStreamApiException {
public static class CreateStreamResult {
public final String stream = stream();
public final String subject = subject();
public StreamInfo si;

public CreateStreamResult info(StreamInfo si) {
this.si = si;
return this;
}
}

public static CreateStreamResult createMemoryStream(JetStreamManagement jsm) throws IOException, JetStreamApiException {
CreateStreamResult csr = new CreateStreamResult();
return csr.info(createMemoryStream(jsm, csr.stream, csr.subject));
}

public static StreamInfo createMemoryStream(JetStreamManagement jsm, String streamName, String... subjects) throws IOException, JetStreamApiException {
if (streamName == null) {
streamName = stream();
}

if (subjects == null || subjects.length == 0) {
subjects = new String[]{subject()};
}

StreamConfiguration sc = StreamConfiguration.builder()
.name(streamName)
Expand All @@ -90,8 +112,13 @@ public static StreamInfo createMemoryStream(JetStreamManagement jsm, String stre
return jsm.addStream(sc);
}

public static CreateStreamResult createMemoryStream(Connection nc)
throws IOException, JetStreamApiException {
return createMemoryStream(nc.jetStreamManagement());
}

public static StreamInfo createMemoryStream(Connection nc, String streamName, String... subjects)
throws IOException, JetStreamApiException {
throws IOException, JetStreamApiException {
return createMemoryStream(nc.jetStreamManagement(), streamName, subjects);
}

Expand Down
Loading

0 comments on commit 06e73f2

Please sign in to comment.