Skip to content

Commit

Permalink
ARTEMIS-5302 extra fixes for using QueueConfiguration more
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 14, 2025
1 parent d30a428 commit 458292c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public static QueueConfiguration of(final QueueConfiguration queueConfiguration)
return new QueueConfiguration(queueConfiguration);
}

/**
* @deprecated
* Use {@link #of(String)} instead.
*/
@Deprecated(forRemoval = true)
public QueueConfiguration() {
}

Expand Down Expand Up @@ -410,15 +415,27 @@ public SimpleString getFilterString() {
return filterString;
}

/**
* This sets the {@code SimpleString} value that will be used to create a {@code Filter} for the {@code Queue}
* implementation on the broker. The filter's syntax is not validated here.
* @param filterString the filter to use; an empty value or a value filled with whitespace is equivalent to passing
* {@code null}
* @return this {@code QueueConfiguration}
*/
public QueueConfiguration setFilterString(SimpleString filterString) {
if (filterString != null && !filterString.isEmpty() && !filterString.isBlank()) {
this.filterString = filterString;
} else if (filterString == null) {
if (filterString == null || filterString.isEmpty() || filterString.isBlank()) {
this.filterString = null;
} else {
this.filterString = filterString;
}
return this;
}

/**
* Converts the {@code String} parameter to {@code SimpleString} and invokes
* {@link #setFilterString(SimpleString)}
* @see #setFilterString(SimpleString)
*/
public QueueConfiguration setFilterString(String filterString) {
return setFilterString(SimpleString.of(filterString));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.utils;

import java.util.Arrays;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -43,20 +46,27 @@ public void testOutOfBoundsThrownOnMalformedString() {

@Test
public void testBlank() {
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of(" ".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\t".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\n".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\r".repeat(i)).isBlank());
List<String> whitespace = Arrays.asList(" ", "\t", "\n", "\r");

// check empty and pure whitespace
for (String s : whitespace) {
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of(s.repeat(i)).isBlank());
}
}

// check pure non-whitespace
for (int i = 1; i <= 10; i++) {
assertFalse(SimpleString.of("x".repeat(i)).isBlank());
}

// check a mix of both whitespace and non-whitepsace
for (String s : whitespace) {
for (int i = 1; i <= 10; i++) {
assertFalse(SimpleString.of(s + "x".repeat(i)).isBlank());
assertFalse(SimpleString.of("x".repeat(i) + s).isBlank());
assertFalse(SimpleString.of(s + "x".repeat(i) + s).isBlank());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
private List<QueueStatusEncoding> queueStatusEncodings;

public PersistentQueueBindingEncoding() {
config = new QueueConfiguration();
}

@Override
Expand Down Expand Up @@ -67,7 +66,7 @@ public List<QueueStatusEncoding> getQueueStatusEncodings() {

@Override
public void decode(final ActiveMQBuffer buffer) {
config.setName(buffer.readSimpleString());
config = QueueConfiguration.of(buffer.readSimpleString());
config.setAddress(buffer.readSimpleString());
config.setFilterString(buffer.readNullableSimpleString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
public class LastValueQueue extends QueueImpl {

private final Map<SimpleString, MessageReference> map = new ConcurrentHashMap<>();
private final SimpleString lastValueKey;

public LastValueQueue(final QueueConfiguration queueConfiguration,
final Filter filter,
Expand All @@ -66,7 +65,6 @@ public LastValueQueue(final QueueConfiguration queueConfiguration,
final ActiveMQServer server,
final QueueFactory factory) {
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
this.lastValueKey = queueConfiguration.getLastValueKey();
}

@Override
Expand Down Expand Up @@ -121,11 +119,6 @@ public boolean allowsReferenceCallback() {
return false;
}

@Override
public QueueConfiguration getQueueConfiguration() {
return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
}

@Override
protected void pruneLastValues() {
// called with synchronized(this) from super.deliver()
Expand Down Expand Up @@ -214,16 +207,6 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
};
}

@Override
public boolean isLastValue() {
return true;
}

@Override
public SimpleString getLastValueKey() {
return lastValueKey;
}

public synchronized Set<SimpleString> getLastValueKeys() {
return Collections.unmodifiableSet(map.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory

private final int initialQueueBufferSize;

private final QueueConfiguration queueConfiguration;
protected final QueueConfiguration queueConfiguration;

@Override
public boolean isSwept() {
Expand Down Expand Up @@ -385,7 +385,7 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.createdTimestamp = System.currentTimeMillis();

this.queueConfiguration = queueConfiguration;
this.queueConfiguration = QueueConfiguration.of(queueConfiguration);
QueueConfigurationUtils.applyStaticDefaults(this.queueConfiguration);

this.refCountForConsumers = this.queueConfiguration.isTransient() ? new TransientQueueManagerImpl(server, this.queueConfiguration.getName()) : new QueueManagerImpl(server, this.queueConfiguration.getName());
Expand Down Expand Up @@ -537,12 +537,12 @@ public synchronized void setDispatching(boolean dispatching) {

@Override
public boolean isLastValue() {
return false;
return queueConfiguration.isLastValue();
}

@Override
public SimpleString getLastValueKey() {
return null;
return queueConfiguration.getLastValueKey();
}

@Override
Expand Down Expand Up @@ -4081,7 +4081,7 @@ public static MessageGroups<Consumer> groupMap(int groupBuckets) {

@Override
public QueueConfiguration getQueueConfiguration() {
return queueConfiguration;
return QueueConfiguration.of(queueConfiguration);
}

protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
Expand Down

0 comments on commit 458292c

Please sign in to comment.