Skip to content

Commit

Permalink
Multiple Filter Subjects tuning and testing
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf committed Sep 1, 2023
1 parent a0f43f1 commit 313006a
Show file tree
Hide file tree
Showing 11 changed files with 305 additions and 91 deletions.
46 changes: 23 additions & 23 deletions src/main/java/io/nats/client/api/ConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -329,11 +329,11 @@ public long getMaxDeliver() {
}

/**
* Gets the filter subject of this consumer configuration. May be comma delimited if there are multiple filter subjects. May be null.
* @return the filter subject.
* Gets the first filter subject of this consumer configuration. Will be null if there are none.
* @return the first filter subject.
*/
public String getFilterSubject() {
return filterSubjects.isEmpty() ? null : String.join(",", filterSubjects);
return filterSubjects.isEmpty() ? null : filterSubjects.get(0);
}

/**
Expand Down Expand Up @@ -673,7 +673,7 @@ public Builder(ConsumerConfiguration cc) {
this.name = cc.name;
this.deliverSubject = cc.deliverSubject;
this.deliverGroup = cc.deliverGroup;
this.filterSubjects = cc.filterSubjects;
this.filterSubjects = new ArrayList<>(cc.filterSubjects);
this.sampleFrequency = cc.sampleFrequency;

this.startTime = cc.startTime;
Expand Down Expand Up @@ -847,26 +847,22 @@ public Builder maxDeliver(long maxDeliver) {
}

/**
* Sets the filter subject or subjects of the ConsumerConfiguration.
* @param filterSubject the filter subject; a comma delimited list is supported
* when connecting to a server version 2.9.10 or later.
* Sets the filter subject of the ConsumerConfiguration.
* @param filterSubject the filter subject
* @return Builder
*/
public Builder filterSubject(String filterSubject) {
String temp = emptyAsNull(filterSubject);
if (temp == null) {
filterSubjects.clear();
}
else {
String[] split = filterSubject.split(",");
for (String s : split) {
temp = emptyAsNull(s.trim());
if (temp != null) {
filterSubjects.add(temp);
}
}
}
return this;
return filterSubjects(Collections.singletonList(filterSubject));
}


/**
* Sets the filter subjects of the ConsumerConfiguration.
* @param filterSubjects the array of filter subjects
* @return Builder
*/
public Builder filterSubjects(String... filterSubjects) {
return filterSubjects(Arrays.asList(filterSubjects));
}

/**
Expand All @@ -876,8 +872,12 @@ public Builder filterSubject(String filterSubject) {
*/
public Builder filterSubjects(List<String> filterSubjects) {
this.filterSubjects.clear();
if (filterSubjects != null && filterSubjects.isEmpty()) {
this.filterSubjects.addAll(filterSubjects);
if (filterSubjects != null) {
for (String fs : filterSubjects) {
if (!nullOrEmpty(fs)) {
this.filterSubjects.add(fs);
}
}
}
return this;
}
Expand Down
46 changes: 41 additions & 5 deletions src/main/java/io/nats/client/api/OrderedConsumerConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,18 @@
package io.nats.client.api;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static io.nats.client.support.Validator.emptyOrNullAs;
import static io.nats.client.support.Validator.emptyAsNull;

public class OrderedConsumerConfiguration {

public static String DEFAULT_FILTER_SUBJECT = ">";

private String filterSubject;
private final List<String> filterSubjects;
private DeliverPolicy deliverPolicy;
private Long startSequence;
private ZonedDateTime startTime;
Expand All @@ -35,7 +39,8 @@ public class OrderedConsumerConfiguration {
*/
public OrderedConsumerConfiguration() {
startSequence = ConsumerConfiguration.LONG_UNSET;
filterSubject = DEFAULT_FILTER_SUBJECT;
filterSubjects = new ArrayList<>();
filterSubjects.add(DEFAULT_FILTER_SUBJECT);
}

/**
Expand All @@ -44,7 +49,34 @@ public OrderedConsumerConfiguration() {
* @return Builder
*/
public OrderedConsumerConfiguration filterSubject(String filterSubject) {
this.filterSubject = emptyOrNullAs(filterSubject, DEFAULT_FILTER_SUBJECT);
return filterSubjects(Collections.singletonList(filterSubject));
}

/**
* Sets the filter subjects of the OrderedConsumerConfiguration.
* @param filterSubject the filter subject
* @return Builder
*/
public OrderedConsumerConfiguration filterSubjects(String... filterSubject) {
return filterSubjects(Arrays.asList(filterSubject));
}

/**
* Sets the filter subject of the OrderedConsumerConfiguration.
* @param filterSubjects one or more filter subjects
* @return Builder
*/
public OrderedConsumerConfiguration filterSubjects(List<String> filterSubjects) {
this.filterSubjects.clear();
for (String fs : filterSubjects) {
String fsean = emptyAsNull(fs);
if (fsean != null) {
this.filterSubjects.add(fsean);
}
}
if (this.filterSubjects.isEmpty()) {
this.filterSubjects.add(DEFAULT_FILTER_SUBJECT);
}
return this;
}

Expand Down Expand Up @@ -100,7 +132,11 @@ public OrderedConsumerConfiguration headersOnly(Boolean headersOnly) {
}

public String getFilterSubject() {
return filterSubject;
return filterSubjects.get(0);
}

public List<String> getFilterSubjects() {
return filterSubjects;
}

public DeliverPolicy getDeliverPolicy() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class NatsConsumerContext implements ConsumerContext, SimplifiedSubscript
ordered = true;
consumerName = null;
originalOrderedCc = ConsumerConfiguration.builder()
.filterSubject(config.getFilterSubject())
.filterSubjects(config.getFilterSubjects())
.deliverPolicy(config.getDeliverPolicy())
.startSequence(config.getStartSequence())
.startTime(config.getStartTime())
Expand Down
Loading

0 comments on commit 313006a

Please sign in to comment.