Skip to content

Commit

Permalink
Server urls connection management (#648)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Jun 1, 2022
1 parent 528f6b0 commit 8ff84be
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 65 deletions.
154 changes: 132 additions & 22 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,23 @@
* starting with the {@link Options.Builder Builder}, since it has a simple list of methods that configure the connection.
*/
public class Options {
// NOTE TO DEVS
// To add an option, you have to:
// * add property
// * add field in builder
// * add field in options
// * add a chainable method in builder
// * add update build
// * update constructor that takes properties
// * optional default in statics

// ----------------------------------------------------------------------------------------------------
// NOTE TO DEVS!!! To add an option, you have to address:
// ----------------------------------------------------------------------------------------------------
// CONSTANTS * optionally add a default value constant
// ENVIRONMENT * most of the time add an environment property
// CLASS VARIABLES * add a variable to the class
// BUILDER VARIABLES * add a variable in builder
// BUILD CONSTRUCTOR PROPS * update build props constructor to read new props
// BUILDER METHODS * add a chainable method in builder for new variable
// BUILD IMPL * update build() implementation if needed
// CONSTRUCTOR * update constructor to ensure new variables are set from builder
// GETTERS * update getter to be able to retrieve class variable value
// ----------------------------------------------------------------------------------------------------

// ----------------------------------------------------------------------------------------------------
// CONSTANTS
// ----------------------------------------------------------------------------------------------------
/**
* Default server URL.
*
Expand Down Expand Up @@ -216,6 +223,9 @@ public class Options {
*/
public static final boolean DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL = false;

// ----------------------------------------------------------------------------------------------------
// ENVIRONMENT
// ----------------------------------------------------------------------------------------------------
static final String PFX = "io.nats.client.";

/**
Expand Down Expand Up @@ -472,7 +482,24 @@ public class Options {
*/
static final String OPTION_NORESPONDERS = "no_responders";

private final List<URI> servers;
/**
* Property used to set the whether to ignore discovered servers when connecting
*/
public static final String PROP_IGNORE_DISCOVERED_SERVERS = "ignore_discovered_servers";

/**
* Property used to set class name for ServerListProvider implementation
* {@link Builder#serverListProvider(ServerListProvider) serverListProvider}.
*
* IMPORTANT! ServerListProvider IS CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
*/
public static final String PROP_SERVERS_LIST_PROVIDER_CLASS = "servers_list_provider_class";

// ----------------------------------------------------------------------------------------------------
// CLASS VARIABLES
// ----------------------------------------------------------------------------------------------------
private final List<URI> serverURIs;
private final List<String> unprocessedServers;
private final boolean noRandomize;
private final String connectionName;
private final boolean verbose;
Expand Down Expand Up @@ -501,6 +528,7 @@ public class Options {
private final boolean utf8Support;
private final int maxMessagesInOutgoingQueue;
private final boolean discardMessagesWhenOutgoingQueueFull;
private final boolean ignoreDiscoveredServers;

private final AuthHandler authHandler;
private final ReconnectDelayHandler reconnectDelayHandler;
Expand All @@ -513,6 +541,7 @@ public class Options {
private final boolean traceConnection;

private final ExecutorService executor;
private final ServerListProvider serverListProvider;

static class DefaultThreadFactory implements ThreadFactory {
String name;
Expand Down Expand Up @@ -547,7 +576,11 @@ public Thread newThread(Runnable r) {
*/
public static class Builder {

private final ArrayList<URI> servers = new ArrayList<>();
// ----------------------------------------------------------------------------------------------------
// BUILDER VARIABLES
// ----------------------------------------------------------------------------------------------------
private final List<URI> serverURIs = new ArrayList<>();
private final List<String> unprocessedServers = new ArrayList<>();
private boolean noRandomize = false;
private String connectionName = null; // Useful for debugging -> "test: " + NatsTestServer.currentPort();
private boolean verbose = false;
Expand Down Expand Up @@ -578,6 +611,8 @@ public static class Builder {
private String inboxPrefix = DEFAULT_INBOX_PREFIX;
private int maxMessagesInOutgoingQueue = DEFAULT_MAX_MESSAGES_IN_OUTGOING_QUEUE;
private boolean discardMessagesWhenOutgoingQueueFull = DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL;
private boolean ignoreDiscoveredServers = false;
private ServerListProvider serverListProvider = null;

private AuthHandler authHandler;
private ReconnectDelayHandler reconnectDelayHandler;
Expand All @@ -597,6 +632,9 @@ public static class Builder {
public Builder() {
}

// ----------------------------------------------------------------------------------------------------
// BUILD CONSTRUCTOR PROPS
// ----------------------------------------------------------------------------------------------------
/**
* Constructs a new {@code Builder} from a {@link Properties} object.
*
Expand Down Expand Up @@ -783,6 +821,15 @@ public Builder(Properties props) throws IllegalArgumentException {
this.discardMessagesWhenOutgoingQueueFull = Boolean.parseBoolean(props.getProperty(
PROP_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL, Boolean.toString(DEFAULT_DISCARD_MESSAGES_WHEN_OUTGOING_QUEUE_FULL)));
}

if (props.containsKey(PROP_IGNORE_DISCOVERED_SERVERS)) {
this.ignoreDiscoveredServers = Boolean.parseBoolean(props.getProperty(PROP_IGNORE_DISCOVERED_SERVERS));
}

if (props.containsKey(PROP_SERVERS_LIST_PROVIDER_CLASS)) {
Object instance = createInstanceOf(props.getProperty(PROP_SERVERS_LIST_PROVIDER_CLASS));
this.serverListProvider = (ServerListProvider) instance;
}
}

static Object createInstanceOf(String className) {
Expand All @@ -797,6 +844,9 @@ static Object createInstanceOf(String className) {
return instance;
}

// ----------------------------------------------------------------------------------------------------
// BUILDER METHODS
// ----------------------------------------------------------------------------------------------------
/**
* Add a server to the list of known servers.
*
Expand All @@ -805,7 +855,7 @@ static Object createInstanceOf(String className) {
* @return the Builder for chaining
*/
public Builder server(String serverURL) {
return this.servers(serverURL.trim().split(","));
return servers(serverURL.trim().split(","));
}

/**
Expand All @@ -819,7 +869,9 @@ public Builder servers(String[] servers) {
for (String s : servers) {
if (s != null && !s.isEmpty()) {
try {
this.servers.add(Options.parseURIForServer(s.trim()));
String unprocessed = s.trim();
this.serverURIs.add(Options.parseURIForServer(unprocessed));
this.unprocessedServers.add(unprocessed);
} catch (URISyntaxException e) {
throw new IllegalArgumentException("Bad server URL: " + s, e);
}
Expand All @@ -839,9 +891,11 @@ public Builder oldRequestStyle() {
}

/**
* Turn off server pool randomization. By default the server will pick
* servers from its list randomly on a reconnect. When set to noRandom the server
* goes in the order they were configured or provided by a server in a cluster update.
* For the default server list provider, turn off server pool randomization.
* The default provider will pick servers from its list randomly on a reconnect.
* When noRandomize is set to true the default provider supplies a list that
* first contains servers as configured and then contains the servers as sent
* from the connected server.
* @return the Builder for chaining
*/
public Builder noRandomize() {
Expand Down Expand Up @@ -1336,6 +1390,25 @@ public Builder discardMessagesWhenOutgoingQueueFull() {
return this;
}

/**
* Turn off use of discovered servers when connecting / reconnecting. Used in the default server list provider.
* @return the Builder for chaining
*/
public Builder ignoreDiscoveredServers() {
this.ignoreDiscoveredServers = true;
return this;
}

/**
* Set the ServerListProvider implementation for connections to use instead of the default bahvior
* IMPORTANT! ServerListProvider IS CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
* @return the Builder for chaining
*/
public Builder serverListProvider(ServerListProvider serverListProvider) {
this.serverListProvider = serverListProvider;
return this;
}

/**
* Build an Options object from this Builder.
*
Expand All @@ -1355,16 +1428,18 @@ public Builder discardMessagesWhenOutgoingQueueFull() {
* @throws IllegalStateException if there is a conflict in the options, like a token and a user/pass
*/
public Options build() throws IllegalStateException {

// ----------------------------------------------------------------------------------------------------
// BUILD IMPL
// ----------------------------------------------------------------------------------------------------
if (this.username != null && this.token != null) {
throw new IllegalStateException("Options can't have token and username");
}

if (servers.size() == 0) {
if (serverURIs.size() == 0) {
server(DEFAULT_URL);
}
else if (sslContext == null) {
for (URI serverURI : servers) {
for (URI serverURI : serverURIs) {
if (TLS_PROTOCOL.equals(serverURI.getScheme())) {
try {
this.sslContext = SSLContext.getDefault();
Expand All @@ -1391,8 +1466,12 @@ else if (OPENTLS_PROTOCOL.equals(serverURI.getScheme())) {
}
}

// ----------------------------------------------------------------------------------------------------
// CONSTRUCTOR
// ----------------------------------------------------------------------------------------------------
private Options(Builder b) {
this.servers = b.servers;
this.serverURIs = new ArrayList<>(b.serverURIs); // builder servers is a set so no dupes
this.unprocessedServers = b.unprocessedServers; // exactly how the user gave them
this.noRandomize = b.noRandomize;
this.connectionName = b.connectionName;
this.verbose = b.verbose;
Expand Down Expand Up @@ -1431,8 +1510,15 @@ private Options(Builder b) {
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;

this.ignoreDiscoveredServers = b.ignoreDiscoveredServers;

this.serverListProvider = b.serverListProvider;
}

// ----------------------------------------------------------------------------------------------------
// GETTERS
// ----------------------------------------------------------------------------------------------------
/**
* @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc
*/
Expand Down Expand Up @@ -1486,7 +1572,14 @@ public DataPort buildDataPort() {
* @return the servers stored in this options, see {@link Builder#servers(String[]) servers()} in the builder doc
*/
public Collection<URI> getServers() {
return servers;
return serverURIs;
}

/**
* @return the servers as given to the options, since the servers are normalized
*/
public List<String> getUnprocessedServers() {
return unprocessedServers;
}

/**
Expand Down Expand Up @@ -1739,6 +1832,23 @@ public boolean isDiscardMessagesWhenOutgoingQueueFull() {
return discardMessagesWhenOutgoingQueueFull;
}

/**
* Get whether to ignore discovered servers
* @return the flag
*/
public boolean isIgnoreDiscoveredServers() {
return ignoreDiscoveredServers;
}

/**
* Get a provided ServerListProvider. If null, a default implementation is used.
* IMPORTANT! ServerListProvider IS CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
* @return the ServerListProvider implementation
*/
public ServerListProvider getServerListProvider() {
return serverListProvider;
}

public URI createURIForServer(String serverURI) throws URISyntaxException {
return Options.parseURIForServer(serverURI);
}
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/nats/client/ServerListProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2020 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.client;

import java.util.List;

/**
* Allows the developer to provide the list of servers to try for connecting/reconnecting
*
* IMPORTANT! ServerListProvider IS CURRENTLY EXPERIMENTAL AND SUBJECT TO CHANGE.
*/
public interface ServerListProvider {
/**
* Get the ordered server list to try for connecting/reconnecting
* @param currentServer the server that connection is currently connected to. May be null.
* @param optionsServersUnprocessed the list of server urls exactly how they were given to the options
* @param discoveredServersUnprocessed the entire list of servers exactly as returned in the server info
* @return the ordered server list
*/
List<String> getServerList(String currentServer,
List<String> optionsServersUnprocessed,
List<String> discoveredServersUnprocessed);
}
Loading

0 comments on commit 8ff84be

Please sign in to comment.