Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-5302 extra fixes for using QueueConfiguration more #5507

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ public static QueueConfiguration of(final QueueConfiguration queueConfiguration)
return new QueueConfiguration(queueConfiguration);
}

/**
* @deprecated
* Use {@link #of(String)} instead.
*/
@Deprecated(forRemoval = true)
public QueueConfiguration() {
}

Expand Down Expand Up @@ -410,15 +415,27 @@ public SimpleString getFilterString() {
return filterString;
}

/**
* This sets the {@code SimpleString} value that will be used to create a {@code Filter} for the {@code Queue}
* implementation on the broker. The filter's syntax is not validated here.
* @param filterString the filter to use; an empty value or a value filled with whitespace is equivalent to passing
* {@code null}
* @return this {@code QueueConfiguration}
*/
public QueueConfiguration setFilterString(SimpleString filterString) {
if (filterString != null && !filterString.isEmpty() && !filterString.isBlank()) {
this.filterString = filterString;
} else if (filterString == null) {
if (filterString == null || filterString.isEmpty() || filterString.isBlank()) {
this.filterString = null;
} else {
this.filterString = filterString;
}
return this;
}

/**
* Converts the {@code String} parameter to {@code SimpleString} and invokes
* {@link #setFilterString(SimpleString)}
* @see #setFilterString(SimpleString)
*/
public QueueConfiguration setFilterString(String filterString) {
return setFilterString(SimpleString.of(filterString));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.utils;

import java.util.Arrays;
import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand Down Expand Up @@ -43,20 +46,27 @@ public void testOutOfBoundsThrownOnMalformedString() {

@Test
public void testBlank() {
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of(" ".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\t".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\n".repeat(i)).isBlank());
}
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of("\r".repeat(i)).isBlank());
List<String> whitespace = Arrays.asList(" ", "\t", "\n", "\r");

// check empty and pure whitespace
for (String s : whitespace) {
for (int i = 0; i <= 10; i++) {
assertTrue(SimpleString.of(s.repeat(i)).isBlank());
}
}

// check pure non-whitespace
for (int i = 1; i <= 10; i++) {
assertFalse(SimpleString.of("x".repeat(i)).isBlank());
}

// check a mix of both whitespace and non-whitepsace
for (String s : whitespace) {
for (int i = 1; i <= 10; i++) {
assertFalse(SimpleString.of(s + "x".repeat(i)).isBlank());
assertFalse(SimpleString.of("x".repeat(i) + s).isBlank());
assertFalse(SimpleString.of(s + "x".repeat(i) + s).isBlank());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
private List<QueueStatusEncoding> queueStatusEncodings;

public PersistentQueueBindingEncoding() {
config = new QueueConfiguration();
}

@Override
Expand Down Expand Up @@ -67,7 +66,7 @@ public List<QueueStatusEncoding> getQueueStatusEncodings() {

@Override
public void decode(final ActiveMQBuffer buffer) {
config.setName(buffer.readSimpleString());
config = QueueConfiguration.of(buffer.readSimpleString());
config.setAddress(buffer.readSimpleString());
config.setFilterString(buffer.readNullableSimpleString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
public class LastValueQueue extends QueueImpl {

private final Map<SimpleString, MessageReference> map = new ConcurrentHashMap<>();
private final SimpleString lastValueKey;

public LastValueQueue(final QueueConfiguration queueConfiguration,
final Filter filter,
Expand All @@ -66,7 +65,6 @@ public LastValueQueue(final QueueConfiguration queueConfiguration,
final ActiveMQServer server,
final QueueFactory factory) {
super(queueConfiguration, filter, pagingStore, pageSubscription, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
this.lastValueKey = queueConfiguration.getLastValueKey();
}

@Override
Expand Down Expand Up @@ -121,11 +119,6 @@ public boolean allowsReferenceCallback() {
return false;
}

@Override
public QueueConfiguration getQueueConfiguration() {
return super.getQueueConfiguration().setLastValue(true).setLastValueKey(lastValueKey);
}

@Override
protected void pruneLastValues() {
// called with synchronized(this) from super.deliver()
Expand Down Expand Up @@ -214,16 +207,6 @@ public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
};
}

@Override
public boolean isLastValue() {
return true;
}

@Override
public SimpleString getLastValueKey() {
return lastValueKey;
}

public synchronized Set<SimpleString> getLastValueKeys() {
return Collections.unmodifiableSet(map.keySet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory

private final int initialQueueBufferSize;

private final QueueConfiguration queueConfiguration;
protected final QueueConfiguration queueConfiguration;

@Override
public boolean isSwept() {
Expand Down Expand Up @@ -385,7 +385,7 @@ public QueueImpl(final QueueConfiguration queueConfiguration,

this.createdTimestamp = System.currentTimeMillis();

this.queueConfiguration = queueConfiguration;
this.queueConfiguration = QueueConfiguration.of(queueConfiguration);
QueueConfigurationUtils.applyStaticDefaults(this.queueConfiguration);

this.refCountForConsumers = this.queueConfiguration.isTransient() ? new TransientQueueManagerImpl(server, this.queueConfiguration.getName()) : new QueueManagerImpl(server, this.queueConfiguration.getName());
Expand Down Expand Up @@ -537,12 +537,12 @@ public synchronized void setDispatching(boolean dispatching) {

@Override
public boolean isLastValue() {
return false;
return queueConfiguration.isLastValue();
}

@Override
public SimpleString getLastValueKey() {
return null;
return queueConfiguration.getLastValueKey();
}

@Override
Expand Down Expand Up @@ -4081,7 +4081,7 @@ public static MessageGroups<Consumer> groupMap(int groupBuckets) {

@Override
public QueueConfiguration getQueueConfiguration() {
return queueConfiguration;
return QueueConfiguration.of(queueConfiguration);
}

protected static class ConsumerHolder<T extends Consumer> implements PriorityAware {
Expand Down